You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/04/12 22:29:43 UTC

svn commit: r528222 [5/5] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: examples/ main/ main/activemq/connector/openwire/ main/activemq/connector/openwire/commands/ main/activemq/connector/stomp/ main/activemq/connector/stomp/commands/ main/activ...

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/transport/DummyTransport.h>
+#include <activemq/support/LibraryInit.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+DummyTransport* DummyTransport::instance = NULL;
+
+////////////////////////////////////////////////////////////////////////////////
+DummyTransport::DummyTransport( ResponseBuilder* responseBuilder ,
+                                bool own,
+                                bool useDefOutgoing ){
+
+    this->responseBuilder = NULL;
+    this->commandListener = NULL;
+    this->outgoingCommandListener = NULL;
+    this->exceptionListener = NULL;
+    this->responseBuilder = responseBuilder;
+    this->own = own;
+    this->nextCommandId = 0;
+    this->instance = this;
+    if( useDefOutgoing )
+    {
+        this->outgoingCommandListener = &defaultListener;
+        this->defaultListener.setTransport( this );
+        this->defaultListener.setResponseBuilder( responseBuilder );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DummyTransport::~DummyTransport(){
+
+    if( own ){
+        delete responseBuilder;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int DummyTransport::getNextCommandId() throw ( exceptions::ActiveMQException ) {
+
+    try{
+        synchronized( &commandIdMutex ){
+            return ++nextCommandId;
+        }
+
+        // Should never get here, but some compilers aren't
+        // smart enough to figure out we'll never get here.
+        return 0;
+    }
+    AMQ_CATCHALL_THROW( transport::CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DummyTransport::oneway( Command* command )
+        throw(CommandIOException, exceptions::UnsupportedOperationException)
+{
+    if( outgoingCommandListener != NULL ){
+
+        //command->setCommandId( getNextCommandId() );
+        //command->setResponseRequired( false );
+        outgoingCommandListener->onCommand( command );
+        return;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* DummyTransport::request( Command* command )
+    throw(CommandIOException,
+          exceptions::UnsupportedOperationException)
+{
+    if( responseBuilder != NULL ){
+        command->setCommandId( getNextCommandId() );
+        command->setResponseRequired( true );
+        return responseBuilder->buildResponse( command );
+    }
+
+    throw CommandIOException( __FILE__, __LINE__,
+        "no response builder available" );
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h Thu Apr 12 13:29:39 2007
@@ -27,22 +27,23 @@
 #include <activemq/concurrent/Mutex.h>
 #include <activemq/concurrent/Thread.h>
 #include <activemq/util/Config.h>
+#include <activemq/concurrent/CountDownLatch.h>
 
 namespace activemq{
 namespace transport{
-    
+
     class DummyTransport : public Transport{
-    
+
     public:
-    
+
         class ResponseBuilder{
         public:
             virtual ~ResponseBuilder(){}
-            
+
             virtual Response* buildResponse( const Command* cmd ) = 0;
         };
 
-        class InternalCommandListener : 
+        class InternalCommandListener :
             public CommandListener,
             public concurrent::Thread
         {
@@ -52,17 +53,21 @@
             ResponseBuilder* responseBuilder;
             concurrent::Mutex mutex;
             Command* command;
+            Response* response;
             bool done;
+            concurrent::CountDownLatch startedLatch;
 
         public:
 
-            InternalCommandListener(void) {
+            InternalCommandListener(void) : startedLatch(1) {
                 command = NULL;
+                response = NULL;
                 transport = NULL;
                 responseBuilder = NULL;
                 done = false;
-                
+
                 this->start();
+                startedLatch.await();
             }
 
             virtual ~InternalCommandListener() {
@@ -72,6 +77,8 @@
                     mutex.notifyAll();
                 }
                 this->join();
+
+                delete response;
             }
 
             void setTransport( DummyTransport* transport ){
@@ -87,9 +94,13 @@
                 synchronized( &mutex )
                 {
                     this->command = command;
-                    
+                    // Create a response now before the caller has a
+                    // chance to destroy the command.
+                    this->response = 
+                        responseBuilder->buildResponse( command );
+
                     mutex.notifyAll();
-                }                
+                }
             }
 
             void run(void)
@@ -100,35 +111,31 @@
                     {
                         while( !done )
                         {
+                            startedLatch.countDown();
                             mutex.wait();
-                            
+
                             if( command == NULL )
                             {
                                 continue;
                             }
-                            
-                            concurrent::Thread::sleep( 100 );
-                            
-                            if( responseBuilder != NULL && 
-                                transport != NULL )
+
+                            // If we created a response then send it.
+                            if( response != NULL && transport != NULL )
                             {
-                                transport->fireCommand( 
-                                    responseBuilder->buildResponse( 
-                                        command ) );
-                                        
-                                command = NULL;
-                                
-                                return;
+                                transport->fireCommand( this->response );
                             }
+
+                            this->response = NULL;
+                            this->command = NULL;
                         }
                     }
                 }
                 AMQ_CATCHALL_NOTHROW()
             }
         };
-        
+
     private:
-    
+
         ResponseBuilder* responseBuilder;
         CommandListener* commandListener;
         CommandListener* outgoingCommandListener;
@@ -137,112 +144,67 @@
         concurrent::Mutex commandIdMutex;
         bool own;
         InternalCommandListener defaultListener;
-        
+        static DummyTransport* instance;
+
     public:
-    
-        DummyTransport( ResponseBuilder* responseBuilder , 
-                        bool own = false,
-                        bool useDefOutgoing = false ){
-            
-            this->responseBuilder = NULL;
-            this->commandListener = NULL;
-            this->outgoingCommandListener = NULL;
-            this->exceptionListener = NULL;
-            this->responseBuilder = responseBuilder;
-            this->own = own;
-            this->nextCommandId = 0;
-            if( useDefOutgoing )
-            {
-                this->outgoingCommandListener = &defaultListener;
-                this->defaultListener.setTransport( this );
-                this->defaultListener.setResponseBuilder( responseBuilder );
-            }
-        }
-        
-        virtual ~DummyTransport(){
-            if( own ){
-                delete responseBuilder;
-            }
+
+        static DummyTransport* getInstance() {
+            return instance;
         }
-        
+
+        DummyTransport( ResponseBuilder* responseBuilder ,
+                        bool own = false,
+                        bool useDefOutgoing = false );
+
+        virtual ~DummyTransport();
+
         void setResponseBuilder( ResponseBuilder* responseBuilder ){
             this->responseBuilder = responseBuilder;
         }
-        
-        unsigned int getNextCommandId() throw (exceptions::ActiveMQException){
-            
-            try{
-                synchronized( &commandIdMutex ){
-                    return ++nextCommandId;
-                }
-                
-                // Should never get here, but some compilers aren't
-                // smart enough to figure out we'll never get here.
-                return 0;
-            }
-            AMQ_CATCHALL_THROW( transport::CommandIOException )
-        }
-        
-        virtual void oneway( Command* command ) 
-                throw(CommandIOException, exceptions::UnsupportedOperationException)
-        {            
-            if( outgoingCommandListener != NULL ){
-                
-                command->setCommandId( getNextCommandId() );
-                command->setResponseRequired( false );
-                outgoingCommandListener->onCommand( command );
-                return;
-            }
-        }
-        
-        virtual Response* request( Command* command ) 
-            throw(CommandIOException, 
-                  exceptions::UnsupportedOperationException)
-        {
-            if( responseBuilder != NULL ){
-                command->setCommandId( getNextCommandId() );
-                command->setResponseRequired( true );
-                return responseBuilder->buildResponse( command );
-            }
-            
-            throw CommandIOException( __FILE__, __LINE__,
-                "no response builder available" );
-        }
-        
+
+        unsigned int getNextCommandId() throw (exceptions::ActiveMQException);
+
+        virtual void oneway( Command* command )
+                throw(CommandIOException, exceptions::UnsupportedOperationException);
+
+        virtual Response* request( Command* command )
+            throw(CommandIOException,
+                  exceptions::UnsupportedOperationException);
+
         virtual void setCommandListener( CommandListener* listener ){
             this->commandListener = listener;
         }
-        
+
         virtual void setOutgoingCommandListener( CommandListener* listener ){
             outgoingCommandListener = listener;
         }
-        
+
         virtual void setCommandReader( CommandReader* reader AMQCPP_UNUSED){}
-        
+
         virtual void setCommandWriter( CommandWriter* writer AMQCPP_UNUSED){}
-        
-        virtual void setTransportExceptionListener( 
+
+        virtual void setTransportExceptionListener(
             TransportExceptionListener* listener )
         {
             this->exceptionListener = listener;
         }
-        
+
         virtual void fireCommand( Command* cmd ){
             if( commandListener != NULL ){
                 commandListener->onCommand( cmd );
             }
         }
-        
+
         virtual void fireException( const exceptions::ActiveMQException& ex ){
             if( exceptionListener != NULL ){
                 exceptionListener->onTransportException( this, ex );
             }
         }
-        
+
         virtual void start() throw (cms::CMSException){}
         virtual void close() throw (cms::CMSException){}
     };
-    
+
 }}
 
 #endif /*ACTIVEMQ_TANSPORT_DUMMYTRANSPORT_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -14,14 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #include "DummyTransportFactory.h"
 #include <activemq/support/LibraryInit.h>
+#include <activemq/connector/stomp/StompResponseBuilder.h>
+#include <activemq/transport/DummyTransport.h>
 
 using namespace activemq;
 using namespace activemq::transport;
+using namespace activemq::util;
 
 ////////////////////////////////////////////////////////////////////////////////
-//TransportFactoryMapRegistrar DummyTransportFactory::registrar(
-//    "dummy", new DummyTransportFactory());
-//
+Transport* DummyTransportFactory::createTransport(
+    const activemq::util::Properties& properties,
+    Transport* next,
+    bool own ) throw ( exceptions::ActiveMQException )
+{
+    // We don't use the next here, so clean it up now.
+    if( own == true ) {
+        delete next;
+    }
+
+    std::string wireFormat =
+        properties.getProperty( "wireFormat", "stomp" );
+
+    DummyTransport::ResponseBuilder* builder = NULL;
+
+    if( wireFormat == "stomp" )
+    {
+        builder = new connector::stomp::StompResponseBuilder(
+            properties.getProperty(
+                "transport.sessionId", "testSessionId" ) );
+    }
+
+    return new DummyTransport( builder, true, true );
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -18,52 +18,32 @@
 #ifndef ACTIVEMQ_TRANSPORT_DUMMYTRANSPORTFACTORY_H_
 #define ACTIVEMQ_TRANSPORT_DUMMYTRANSPORTFACTORY_H_
 
-#include <activemq/transport/DummyTransport.h>
 #include <activemq/transport/TransportFactory.h>
-#include <activemq/transport/TransportFactoryMapRegistrar.h>
-#include <activemq/connector/stomp/StompResponseBuilder.h>
 #include <activemq/support/LibraryInit.h>
 
 namespace activemq{
 namespace transport{
-    
+
     /**
      * Manufactures DummyTransports, which are objects that
      * read from input streams and write to output streams.
      */
     class DummyTransportFactory : public TransportFactory{
-    private:
-    
- //       static TransportFactoryMapRegistrar registrar;
-        
     public:
-        
+
         virtual ~DummyTransportFactory(){}
-        
+
         /**
-         * Creates a Transport instance. 
+         * Creates a Transport instance.
          * @param properties The properties for the transport.
          */
-        virtual Transport* createTransport( 
-            const activemq::util::Properties& properties )
-        {
-            std::string wireFormat = 
-                properties.getProperty( "wireFormat", "stomp" );
-
-            DummyTransport::ResponseBuilder* builder = NULL;
-
-            if( wireFormat == "stomp" )
-            {
-                builder = new connector::stomp::StompResponseBuilder(
-                    properties.getProperty( 
-                        "transport.sessionId", "testSessionId" ) );
-            }
-            
-            return new DummyTransport( builder, true );
-        }
+        virtual Transport* createTransport(
+            const activemq::util::Properties& properties,
+            Transport* next = NULL,
+            bool own = true ) throw ( exceptions::ActiveMQException );
 
     };
-    
+
 }}
 
 #endif /*ACTIVEMQ_TRANSPORT_DUMMYTRANSPORTFACTORY_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h Thu Apr 12 13:29:39 2007
@@ -47,27 +47,32 @@
         CPPUNIT_TEST_SUITE_END();
 
     public:
-    
+
         class MyCommand : public Command{
         public:
             MyCommand(){ c = 0; }
             virtual ~MyCommand(){}
-            
+
             char c;
-            
+
             virtual void setCommandId( int id AMQCPP_UNUSED){}
             virtual int getCommandId() const{ return 0; }
-            
+
             virtual void setResponseRequired( const bool required AMQCPP_UNUSED){}
             virtual bool isResponseRequired() const{ return false; }
             virtual std::string toString() const{ return ""; }
+            virtual Command* cloneCommand() const{
+                MyCommand* command = new MyCommand;
+                command->c = c;
+                return command;
+            }
         };
-        
+
         class MyCommandListener : public CommandListener{
         public:
             MyCommandListener(){}
             virtual ~MyCommandListener(){}
-            
+
             std::string str;
             virtual void onCommand( Command* command ){
                 const MyCommand* cmd = dynamic_cast<const MyCommand*>(command);
@@ -75,21 +80,21 @@
                 delete command;
             }
         };
-        
+
         class MyCommandReader : public CommandReader{
         private:
-       
+
             /**
              * The target input stream.
              */
             io::InputStream* inputStream;
-            
+
         public:
             MyCommandReader(){ throwException = false; }
             virtual ~MyCommandReader(){}
-            
+
             bool throwException;
-    
+
             virtual void setInputStream(io::InputStream* is){
                 inputStream = is;
             }
@@ -99,28 +104,28 @@
             }
 
             virtual Command* readCommand( void ) throw (CommandIOException){
-              
+
                 try{
                     if( throwException ){
                         throw CommandIOException();
                     }
-                    
+
                     synchronized( inputStream ){
                         MyCommand* command = new MyCommand();
                         try{
                             command->c = inputStream->read();
                         } catch( exceptions::ActiveMQException& ex ){
-                            
+
                             // Free the memory.
                             delete command;
-                            
+
                             ex.setMark( __FILE__, __LINE__ );
                             throw ex;
                         }
-                        
+
                         return command;
                     }
-                    
+
                     assert(false);
                     return NULL;
                 }catch( exceptions::ActiveMQException& ex ){
@@ -130,45 +135,45 @@
                 }
             }
 
-            virtual std::size_t read(unsigned char* buffer AMQCPP_UNUSED, 
-                             std::size_t count AMQCPP_UNUSED) 
+            virtual std::size_t read(unsigned char* buffer AMQCPP_UNUSED,
+                             std::size_t count AMQCPP_UNUSED)
                 throw( io::IOException ) {
                 return 0;
             }
-           
+
             virtual unsigned char readByte() throw(io::IOException) {
                 return 0;
             }
         };
-        
+
         class MyCommandWriter : public CommandWriter{
         private:
-        
+
             /**
              * Target output stream.
              */
             io::OutputStream* outputStream;
-            
+
         public:
             virtual ~MyCommandWriter(){}
 
             virtual void setOutputStream(io::OutputStream* os){
                 outputStream = os;
             }
-          
+
             virtual io::OutputStream* getOutputStream(void){
                 return outputStream;
             }
 
-            virtual void writeCommand( Command* command ) 
+            virtual void writeCommand( Command* command )
                 throw (CommandIOException)
             {
                 try{
                     synchronized( outputStream ){
-                                            
-                        const MyCommand* m = 
+
+                        const MyCommand* m =
                             dynamic_cast<const MyCommand*>(command);
-                        outputStream->write( m->c );                    
+                        outputStream->write( m->c );
                     }
                 }catch( exceptions::ActiveMQException& ex ){
                     CommandIOException cx( ex );
@@ -177,25 +182,25 @@
                 }
             }
 
-            virtual void write( const unsigned char* buffer AMQCPP_UNUSED, 
-                                std::size_t count AMQCPP_UNUSED) 
+            virtual void write( const unsigned char* buffer AMQCPP_UNUSED,
+                                std::size_t count AMQCPP_UNUSED)
                 throw(io::IOException) {}
-           
+
             virtual void writeByte(unsigned char v AMQCPP_UNUSED) throw(io::IOException) {}
         };
-        
+
         class MyExceptionListener : public TransportExceptionListener{
         public:
-        
+
             Transport* transport;
             concurrent::Mutex mutex;
-            
+
             MyExceptionListener(){
                 transport = NULL;
             }
             virtual ~MyExceptionListener(){}
-            
-            virtual void onTransportException( Transport* source, 
+
+            virtual void onTransportException( Transport* source,
                         const exceptions::ActiveMQException& ex AMQCPP_UNUSED){
                 transport = source;
 
@@ -204,16 +209,16 @@
                    mutex.notify();
                 }
             }
-        };        
-    
+        };
+
     public:
 
         virtual ~IOTransportTest(){}
-        
-        // This will just test that we can start and stop the 
+
+        // This will just test that we can start and stop the
         // transport without any exceptions.
         void testStartClose(){
-            
+
             io::BlockingByteArrayInputStream is;
             io::ByteArrayOutputStream os;
             MyCommandListener listener;
@@ -227,16 +232,16 @@
             transport.setInputStream( &is );
             transport.setOutputStream( &os );
             transport.setTransportExceptionListener( &exListener );
-            
+
             transport.start();
-            
+
             concurrent::Thread::sleep( 50 );
-            
+
             transport.close();
         }
-        
+
         void testRead(){
-            
+
             io::BlockingByteArrayInputStream is;
             io::ByteArrayOutputStream os;
             MyCommandListener listener;
@@ -250,11 +255,11 @@
             transport.setInputStream( &is );
             transport.setOutputStream( &os );
             transport.setTransportExceptionListener( &exListener );
-            
+
             transport.start();
-            
+
             concurrent::Thread::sleep( 10 );
-            
+
             unsigned char buffer[10] = { '1', '2', '3', '4', '5', '6', '7', '8', '9', '0' };
             try{
                 synchronized( &is ){
@@ -263,16 +268,16 @@
             }catch( exceptions::ActiveMQException& ex ){
                 ex.setMark( __FILE__, __LINE__ );
             }
-            
+
             concurrent::Thread::sleep( 100 );
-            
+
             CPPUNIT_ASSERT( listener.str == "1234567890" );
-            
+
             transport.close();
         }
-        
+
         void testWrite(){
-            
+
             io::BlockingByteArrayInputStream is;
             io::ByteArrayOutputStream os;
             MyCommandListener listener;
@@ -286,9 +291,9 @@
             transport.setInputStream( &is );
             transport.setOutputStream( &os );
             transport.setTransportExceptionListener( &exListener );
-            
+
             transport.start();
-            
+
             MyCommand cmd;
             cmd.c = '1';
             transport.oneway( &cmd );
@@ -300,7 +305,7 @@
             transport.oneway( &cmd );
             cmd.c = '5';
             transport.oneway( &cmd );
-            
+
             const unsigned char* bytes = os.getByteArray();
             std::size_t size = os.getByteArraySize();
             CPPUNIT_ASSERT( size >= 5 );
@@ -309,12 +314,12 @@
             CPPUNIT_ASSERT( bytes[2] == '3' );
             CPPUNIT_ASSERT( bytes[3] == '4' );
             CPPUNIT_ASSERT( bytes[4] == '5' );
-            
+
             transport.close();
         }
-        
+
         void testException(){
-            
+
             io::BlockingByteArrayInputStream is;
             io::ByteArrayOutputStream os;
             MyCommandListener listener;
@@ -329,18 +334,18 @@
             transport.setInputStream( &is );
             transport.setOutputStream( &os );
             transport.setTransportExceptionListener( &exListener );
-            
+
             unsigned char buffer[1] = { '1' };
             try{
-                synchronized( &is ){                
+                synchronized( &is ){
                     is.setByteArray( buffer, 1);
                 }
             }catch( exceptions::ActiveMQException& ex ){
                 ex.setMark(__FILE__, __LINE__ );
             }
-            
+
             transport.start();
-            
+
             synchronized(&exListener.mutex)
             {
                if(exListener.transport != &transport)
@@ -348,13 +353,13 @@
                   exListener.mutex.wait(1000);
                }
             }
-                                    
+
             CPPUNIT_ASSERT( exListener.transport == &transport );
-            
+
             transport.close();
         }
     };
-    
+
 }}
 
 #endif /*ACTIVEMQ_COMMANDS_IOTRANSPORTTEST_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h Thu Apr 12 13:29:39 2007
@@ -27,37 +27,41 @@
 
 namespace activemq{
 namespace transport{
-	
-	class TransportFactoryMapRegistrarTest : public CppUnit::TestFixture {
-		
-	  CPPUNIT_TEST_SUITE( TransportFactoryMapRegistrarTest );
-	  CPPUNIT_TEST( test );
-	  CPPUNIT_TEST_SUITE_END();
-	  
-	public:
-	
-		class TestTransportFactory : public TransportFactory
-		{
-		public:
-		
-		   virtual Transport* createTransport(
-		      const activemq::util::Properties& properties AMQCPP_UNUSED ) { return NULL; };
-		};
-        
+
+    class TransportFactoryMapRegistrarTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( TransportFactoryMapRegistrarTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        class TestTransportFactory : public TransportFactory
+        {
+        public:
+
+            virtual Transport* createTransport(
+                const activemq::util::Properties& properties AMQCPP_UNUSED,
+                Transport* next = NULL,
+                bool own = true ) throw ( exceptions::ActiveMQException ) {
+                    return NULL;
+            };
+        };
+
         virtual ~TransportFactoryMapRegistrarTest(){}
-		
-		void test(){
-			
-			{
-				TransportFactoryMapRegistrar registrar("Test", new TestTransportFactory());
-			
-				CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup("Test") != NULL);
-			}
-			
-			CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup( "Test" ) == NULL );
-		}
-	};
-	
+
+        void test(){
+
+            {
+                TransportFactoryMapRegistrar registrar("Test", new TestTransportFactory());
+
+                CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup("Test") != NULL);
+            }
+
+            CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup( "Test" ) == NULL );
+        }
+    };
+
 }}
 
 #endif /*ACTIVEMQ_TRANSPORT_CONNECTORFACTORYMAPREGISTRARTEST_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h Thu Apr 12 13:29:39 2007
@@ -27,38 +27,43 @@
 
 namespace activemq{
 namespace transport{
-	
-	class TransportFactoryMapTest : public CppUnit::TestFixture {
-		
-	  CPPUNIT_TEST_SUITE( TransportFactoryMapTest );
-	  CPPUNIT_TEST( test );
-	  CPPUNIT_TEST_SUITE_END();	  
-		
-	public:
-	
-		class TestTransportFactory : public TransportFactory
-		{
-		public:
-		
-		   virtual Transport* createTransport(
-		      const activemq::util::Properties& properties AMQCPP_UNUSED) { return NULL; };
-		};
-        
+
+    class TransportFactoryMapTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( TransportFactoryMapTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        class TestTransportFactory : public TransportFactory
+        {
+        public:
+
+            virtual Transport* createTransport(
+                const activemq::util::Properties& properties AMQCPP_UNUSED,
+                Transport* next = NULL,
+                bool own = true ) throw ( exceptions::ActiveMQException ) {
+
+                return NULL;
+            };
+        };
+
         virtual ~TransportFactoryMapTest(){}
-		
-		void test(){
-			
-			TransportFactoryMap& factMap = 
+
+        void test(){
+
+            TransportFactoryMap& factMap =
                 TransportFactoryMap::getInstance();
-			TestTransportFactory testFactory;
-			
-			factMap.registerTransportFactory( "test", &testFactory );
-			
-			CPPUNIT_ASSERT( factMap.lookup( "test" ) == &testFactory );
-			
-			std::vector<std::string> names;
-			CPPUNIT_ASSERT( factMap.getFactoryNames( names ) >= 1 );
-			
+            TestTransportFactory testFactory;
+
+            factMap.registerTransportFactory( "test", &testFactory );
+
+            CPPUNIT_ASSERT( factMap.lookup( "test" ) == &testFactory );
+
+            std::vector<std::string> names;
+            CPPUNIT_ASSERT( factMap.getFactoryNames( names ) >= 1 );
+
             bool found = false;
             for( unsigned int i = 0; i < names.size(); ++i )
             {
@@ -68,13 +73,13 @@
                     break;
                 }
             }
-			CPPUNIT_ASSERT( found );
-			
-			factMap.unregisterTransportFactory( "test" );
-			CPPUNIT_ASSERT( factMap.lookup( "test" ) == NULL );			
-		}
-	};
-	
+            CPPUNIT_ASSERT( found );
+
+            factMap.unregisterTransportFactory( "test" );
+            CPPUNIT_ASSERT( factMap.lookup( "test" ) == NULL );
+        }
+    };
+
 }}
 
 #endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFACTORYMAPTEST_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ResponseCorrelatorTest.h"
+
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::filters::ResponseCorrelatorTest );
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORTEST_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <activemq/transport/filters/ResponseCorrelator.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
+#include <activemq/util/Config.h>
+#include <queue>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+    class ResponseCorrelatorTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( ResponseCorrelatorTest );
+        CPPUNIT_TEST( testBasics );
+        CPPUNIT_TEST( testOneway );
+        CPPUNIT_TEST( testTransportException );
+        CPPUNIT_TEST( testMultiRequests );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        class MyCommand : public Command{
+        private:
+
+            unsigned int commandId;
+            bool responseRequired;
+
+        public:
+
+            virtual void setCommandId( int id ){
+                commandId = id;
+            }
+            virtual int getCommandId() const{
+                return commandId;
+            }
+
+            virtual void setResponseRequired( const bool required ){
+                responseRequired = required;
+            }
+            virtual bool isResponseRequired() const{
+                return responseRequired;
+            }
+
+            virtual std::string toString() const{ return ""; }
+
+            virtual Command* cloneCommand() const{
+                MyCommand* command = new MyCommand;
+                command->commandId = commandId;
+                command->responseRequired = responseRequired;
+                return command;
+            }
+        };
+
+        class MyResponse : public Response{
+        private:
+
+            unsigned int commandId;
+            bool responseRequired;
+            unsigned int corrId;
+
+        public:
+
+            virtual void setCommandId( int id ){
+                commandId = id;
+            }
+            virtual int getCommandId() const{
+                return commandId;
+            }
+
+            virtual void setResponseRequired( const bool required ){
+                responseRequired = required;
+            }
+            virtual bool isResponseRequired() const{
+                return responseRequired;
+            }
+
+            virtual int getCorrelationId() const{
+                return corrId;
+            }
+            virtual void setCorrelationId( int corrId ){
+                this->corrId = corrId;
+            }
+
+            virtual std::string toString() const{ return ""; }
+
+            virtual Command* cloneCommand() const{
+                MyResponse* command = new MyResponse;
+                command->commandId = commandId;
+                command->responseRequired = responseRequired;
+                command->corrId = corrId;
+                return command;
+            }
+        };
+
+        class MyTransport
+        :
+            public Transport,
+            public concurrent::Runnable{
+        public:
+            CommandReader* reader;
+            CommandWriter* writer;
+            CommandListener* listener;
+            TransportExceptionListener* exListener;
+            concurrent::Thread* thread;
+            concurrent::Mutex mutex;
+            concurrent::Mutex startedMutex;
+            bool done;
+            std::queue<Command*> requests;
+
+        public:
+
+            MyTransport(){
+                reader = NULL;
+                writer = NULL;
+                listener = NULL;
+                exListener = NULL;
+                thread = NULL;
+                done = false;
+            }
+
+            virtual ~MyTransport(){
+
+                close();
+            }
+
+            virtual void oneway( Command* command )
+                throw(CommandIOException, exceptions::UnsupportedOperationException)
+            {
+                synchronized( &mutex ){
+                    requests.push( command );
+                    mutex.notifyAll();
+                }
+            }
+
+            virtual Response* request( Command* command AMQCPP_UNUSED)
+                throw(CommandIOException, exceptions::UnsupportedOperationException)
+            {
+                throw exceptions::UnsupportedOperationException(
+                    __FILE__,
+                    __LINE__,
+                    "stuff" );
+            }
+
+            virtual void setCommandListener( CommandListener* listener ){
+                this->listener = listener;
+            }
+
+            virtual void setCommandReader( CommandReader* reader ){
+                this->reader = reader;
+            }
+
+            virtual void setCommandWriter( CommandWriter* writer ){
+                this->writer = writer;
+            }
+
+            virtual void setTransportExceptionListener(
+                TransportExceptionListener* listener )
+            {
+                this->exListener = listener;
+            }
+
+            virtual void start() throw( cms::CMSException ){
+                close();
+
+                done = false;
+
+                thread = new concurrent::Thread( this );
+                thread->start();
+            }
+
+            virtual void close() throw( cms::CMSException ){
+
+                done = true;
+
+                if( thread != NULL ){
+                    synchronized( &mutex ){
+                        mutex.notifyAll();
+                    }
+                    thread->join();
+                    delete thread;
+                    thread = NULL;
+                }
+            }
+
+            virtual Response* createResponse( Command* command ){
+
+                MyResponse* resp = new MyResponse();
+                resp->setCorrelationId( command->getCommandId() );
+                resp->setResponseRequired( false );
+                return resp;
+            }
+
+            virtual void run(){
+
+                try{
+
+                    synchronized(&startedMutex)
+                    {
+                       startedMutex.notifyAll();
+                    }
+
+                    synchronized( &mutex ){
+
+                        while( !done ){
+
+                            if( requests.empty() ){
+                                mutex.wait();
+                            }else{
+
+                                Command* cmd = requests.front();
+                                requests.pop();
+
+                                // Only send a response if one is required.
+                                Response* resp = NULL;
+                                if( cmd->isResponseRequired() ){
+                                    resp = createResponse( cmd );
+                                }
+
+                                mutex.unlock();
+
+                                // Send both the response and the original
+                                // command back to the correlator.
+                                if( listener != NULL ){
+                                    if( resp != NULL ){
+                                        listener->onCommand( resp );
+                                    }
+                                    listener->onCommand( cmd );
+                                }
+
+                                mutex.lock();
+                            }
+                        }
+                    }
+                }catch( exceptions::ActiveMQException& ex ){
+                    if( exListener ){
+                        exListener->onTransportException( this, ex );
+                    }
+                }
+                catch( ... ){
+                    if( exListener ){
+                        exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
+                        exListener->onTransportException( this, ex );
+                    }
+                }
+            }
+        };
+
+        class MyBrokenTransport : public MyTransport{
+        public:
+
+            MyBrokenTransport(){}
+            virtual ~MyBrokenTransport(){}
+
+            virtual Response* createResponse( Command* command AMQCPP_UNUSED){
+                throw exceptions::ActiveMQException( __FILE__, __LINE__,
+                    "bad stuff" );
+            }
+        };
+
+        class MyListener
+        :
+            public CommandListener,
+            public TransportExceptionListener{
+
+        public:
+
+            int exCount;
+            std::set<int> commands;
+            concurrent::Mutex mutex;
+
+        public:
+
+            MyListener(){
+                exCount = 0;
+            }
+            virtual ~MyListener(){}
+            virtual void onCommand( Command* command ){
+
+                synchronized( &mutex ){
+                    commands.insert( command->getCommandId() );
+
+                    mutex.notify();
+                }
+            }
+
+            virtual void onTransportException(
+                Transport* source AMQCPP_UNUSED,
+                const exceptions::ActiveMQException& ex AMQCPP_UNUSED)
+            {
+                synchronized( &mutex ){
+                    exCount++;
+                }
+            }
+        };
+
+        class RequestThread : public concurrent::Thread{
+        public:
+
+            Transport* transport;
+            MyCommand cmd;
+            Response* resp;
+        public:
+
+            RequestThread(){
+                transport = NULL;
+                resp = NULL;
+            }
+            virtual ~RequestThread(){
+                join();
+
+                if( resp != NULL ){
+                    delete resp;
+                    resp = NULL;
+                }
+            }
+
+            void setTransport( Transport* transport ){
+                this->transport = transport;
+            }
+
+            void run(){
+
+                try{
+                    resp = transport->request(&cmd);
+                }catch( ... ){
+                    CPPUNIT_ASSERT( false );
+                }
+            }
+        };
+
+    public:
+
+        virtual ~ResponseCorrelatorTest(){}
+
+        void testBasics(){
+
+            try{
+
+                MyListener listener;
+                MyTransport transport;
+                ResponseCorrelator correlator( &transport, false );
+                correlator.setCommandListener( &listener );
+                correlator.setTransportExceptionListener( &listener );
+                CPPUNIT_ASSERT( transport.listener == &correlator );
+                CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+                // Give the thread a little time to get up and running.
+                synchronized(&transport.startedMutex)
+                {
+                    // Start the transport.
+                    correlator.start();
+                    transport.startedMutex.wait();
+                }
+
+                // Send one request.
+                MyCommand cmd;
+                Response* resp = correlator.request( &cmd );
+                CPPUNIT_ASSERT( resp != NULL );
+                CPPUNIT_ASSERT( resp->getCorrelationId() == cmd.getCommandId() );
+
+                // Wait to get the message back asynchronously.
+                concurrent::Thread::sleep( 100 );
+
+                // Since our transport relays our original command back at us as a
+                // non-response message, check to make sure we received it and that
+                // it is the original command.
+                CPPUNIT_ASSERT( listener.commands.size() == 1 );
+                CPPUNIT_ASSERT( listener.exCount == 0 );
+
+                correlator.close();
+
+                // Destroy the response.
+                delete resp;
+            }
+            AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+            AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+        }
+
+        void testOneway(){
+
+            try{
+
+                MyListener listener;
+                MyTransport transport;
+                ResponseCorrelator correlator( &transport, false );
+                correlator.setCommandListener( &listener );
+                correlator.setTransportExceptionListener( &listener );
+                CPPUNIT_ASSERT( transport.listener == &correlator );
+                CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+                // Give the thread a little time to get up and running.
+                synchronized(&transport.startedMutex)
+                {
+                    // Start the transport.
+                    correlator.start();
+
+                    transport.startedMutex.wait();
+                }
+
+                // Send many oneway request (we'll get them back asynchronously).
+                const unsigned int numCommands = 1000;
+                MyCommand commands[numCommands];
+                for( unsigned int ix=0; ix<numCommands; ix++ ){
+                    correlator.oneway( &commands[ix] );
+                }
+
+                // Give the thread a little time to get all the messages back.
+                concurrent::Thread::sleep( 500 );
+
+                // Make sure we got them all back.
+                CPPUNIT_ASSERT( listener.commands.size() == numCommands );
+                CPPUNIT_ASSERT( listener.exCount == 0 );
+
+                correlator.close();
+            }
+            AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+            AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+        }
+
+        void testTransportException(){
+
+            try{
+
+                MyListener listener;
+                MyBrokenTransport transport;
+                ResponseCorrelator correlator( &transport, false );
+                correlator.setCommandListener( &listener );
+                correlator.setTransportExceptionListener( &listener );
+                CPPUNIT_ASSERT( transport.listener == &correlator );
+                CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+                // Give the thread a little time to get up and running.
+                synchronized(&transport.startedMutex)
+                {
+                    // Start the transport.
+                    correlator.start();
+
+                    transport.startedMutex.wait();
+                }
+
+                // Send one request.
+                MyCommand cmd;
+                try{
+                    correlator.request( &cmd );
+                    CPPUNIT_ASSERT(false);
+                }catch( CommandIOException& ex ){
+                    // Expected.
+                }
+
+                // Wait to make sure we get the asynchronous message back.
+                concurrent::Thread::sleep( 200 );
+
+                // Since our transport relays our original command back at us as a
+                // non-response message, check to make sure we received it and that
+                // it is the original command.
+                CPPUNIT_ASSERT( listener.commands.size() == 0 );
+                CPPUNIT_ASSERT( listener.exCount == 1 );
+
+                correlator.close();
+            }
+            AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+            AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+        }
+
+        void testMultiRequests(){
+
+            try{
+
+                MyListener listener;
+                MyTransport transport;
+                ResponseCorrelator correlator( &transport, false );
+                correlator.setCommandListener( &listener );
+                correlator.setTransportExceptionListener( &listener );
+                CPPUNIT_ASSERT( transport.listener == &correlator );
+                CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+                // Start the transport.
+                correlator.start();
+
+                // Make sure the start command got down to the thread.
+                CPPUNIT_ASSERT( transport.thread != NULL );
+
+                // Give the thread a little time to get up and running.
+                synchronized(&transport.startedMutex)
+                {
+                    transport.startedMutex.wait(500);
+                }
+
+                // Start all the requester threads.
+                const unsigned int numRequests = 100;
+                RequestThread requesters[numRequests];
+                for( unsigned int ix=0; ix<numRequests; ++ix ){
+                    requesters[ix].setTransport( &correlator );
+                    requesters[ix].start();
+                }
+
+                // Make sure we got all the responses and that they were all
+                // what we expected.
+                for( unsigned int ix=0; ix<numRequests; ++ix ){
+                    requesters[ix].join();
+                    CPPUNIT_ASSERT( requesters[ix].resp != NULL );
+                    CPPUNIT_ASSERT( requesters[ix].cmd.getCommandId() == requesters[ix].resp->getCorrelationId() );
+                }
+
+                concurrent::Thread::sleep( 60 );
+                synchronized( &listener.mutex )
+                {
+                    unsigned int count = 0;
+
+                    while( listener.commands.size() != numRequests )
+                    {
+                        listener.mutex.wait( 75 );
+
+                        ++count;
+
+                        if( count == numRequests ) {
+                            break;
+                        }
+                    }
+                }
+
+                // Since our transport relays our original command back at us as a
+                // non-response message, check to make sure we received it and that
+                // it is the original command.
+                CPPUNIT_ASSERT( listener.commands.size() == numRequests );
+                CPPUNIT_ASSERT( listener.exCount == 0 );
+
+                correlator.close();
+            }
+            AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+            AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+        }
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORTEST_H_*/