You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/04/12 22:29:43 UTC
svn commit: r528222 [5/5] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: examples/ main/
main/activemq/connector/openwire/
main/activemq/connector/openwire/commands/ main/activemq/connector/stomp/
main/activemq/connector/stomp/commands/ main/activ...
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/transport/DummyTransport.h>
+#include <activemq/support/LibraryInit.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+DummyTransport* DummyTransport::instance = NULL;
+
+////////////////////////////////////////////////////////////////////////////////
+DummyTransport::DummyTransport( ResponseBuilder* responseBuilder ,
+ bool own,
+ bool useDefOutgoing ){
+
+ this->responseBuilder = NULL;
+ this->commandListener = NULL;
+ this->outgoingCommandListener = NULL;
+ this->exceptionListener = NULL;
+ this->responseBuilder = responseBuilder;
+ this->own = own;
+ this->nextCommandId = 0;
+ this->instance = this;
+ if( useDefOutgoing )
+ {
+ this->outgoingCommandListener = &defaultListener;
+ this->defaultListener.setTransport( this );
+ this->defaultListener.setResponseBuilder( responseBuilder );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DummyTransport::~DummyTransport(){
+
+ if( own ){
+ delete responseBuilder;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int DummyTransport::getNextCommandId() throw ( exceptions::ActiveMQException ) {
+
+ try{
+ synchronized( &commandIdMutex ){
+ return ++nextCommandId;
+ }
+
+ // Should never get here, but some compilers aren't
+ // smart enough to figure out we'll never get here.
+ return 0;
+ }
+ AMQ_CATCHALL_THROW( transport::CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DummyTransport::oneway( Command* command )
+ throw(CommandIOException, exceptions::UnsupportedOperationException)
+{
+ if( outgoingCommandListener != NULL ){
+
+ //command->setCommandId( getNextCommandId() );
+ //command->setResponseRequired( false );
+ outgoingCommandListener->onCommand( command );
+ return;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* DummyTransport::request( Command* command )
+ throw(CommandIOException,
+ exceptions::UnsupportedOperationException)
+{
+ if( responseBuilder != NULL ){
+ command->setCommandId( getNextCommandId() );
+ command->setResponseRequired( true );
+ return responseBuilder->buildResponse( command );
+ }
+
+ throw CommandIOException( __FILE__, __LINE__,
+ "no response builder available" );
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.cpp
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransport.h Thu Apr 12 13:29:39 2007
@@ -27,22 +27,23 @@
#include <activemq/concurrent/Mutex.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/util/Config.h>
+#include <activemq/concurrent/CountDownLatch.h>
namespace activemq{
namespace transport{
-
+
class DummyTransport : public Transport{
-
+
public:
-
+
class ResponseBuilder{
public:
virtual ~ResponseBuilder(){}
-
+
virtual Response* buildResponse( const Command* cmd ) = 0;
};
- class InternalCommandListener :
+ class InternalCommandListener :
public CommandListener,
public concurrent::Thread
{
@@ -52,17 +53,21 @@
ResponseBuilder* responseBuilder;
concurrent::Mutex mutex;
Command* command;
+ Response* response;
bool done;
+ concurrent::CountDownLatch startedLatch;
public:
- InternalCommandListener(void) {
+ InternalCommandListener(void) : startedLatch(1) {
command = NULL;
+ response = NULL;
transport = NULL;
responseBuilder = NULL;
done = false;
-
+
this->start();
+ startedLatch.await();
}
virtual ~InternalCommandListener() {
@@ -72,6 +77,8 @@
mutex.notifyAll();
}
this->join();
+
+ delete response;
}
void setTransport( DummyTransport* transport ){
@@ -87,9 +94,13 @@
synchronized( &mutex )
{
this->command = command;
-
+ // Create a response now before the caller has a
+ // chance to destroy the command.
+ this->response =
+ responseBuilder->buildResponse( command );
+
mutex.notifyAll();
- }
+ }
}
void run(void)
@@ -100,35 +111,31 @@
{
while( !done )
{
+ startedLatch.countDown();
mutex.wait();
-
+
if( command == NULL )
{
continue;
}
-
- concurrent::Thread::sleep( 100 );
-
- if( responseBuilder != NULL &&
- transport != NULL )
+
+ // If we created a response then send it.
+ if( response != NULL && transport != NULL )
{
- transport->fireCommand(
- responseBuilder->buildResponse(
- command ) );
-
- command = NULL;
-
- return;
+ transport->fireCommand( this->response );
}
+
+ this->response = NULL;
+ this->command = NULL;
}
}
}
AMQ_CATCHALL_NOTHROW()
}
};
-
+
private:
-
+
ResponseBuilder* responseBuilder;
CommandListener* commandListener;
CommandListener* outgoingCommandListener;
@@ -137,112 +144,67 @@
concurrent::Mutex commandIdMutex;
bool own;
InternalCommandListener defaultListener;
-
+ static DummyTransport* instance;
+
public:
-
- DummyTransport( ResponseBuilder* responseBuilder ,
- bool own = false,
- bool useDefOutgoing = false ){
-
- this->responseBuilder = NULL;
- this->commandListener = NULL;
- this->outgoingCommandListener = NULL;
- this->exceptionListener = NULL;
- this->responseBuilder = responseBuilder;
- this->own = own;
- this->nextCommandId = 0;
- if( useDefOutgoing )
- {
- this->outgoingCommandListener = &defaultListener;
- this->defaultListener.setTransport( this );
- this->defaultListener.setResponseBuilder( responseBuilder );
- }
- }
-
- virtual ~DummyTransport(){
- if( own ){
- delete responseBuilder;
- }
+
+ static DummyTransport* getInstance() {
+ return instance;
}
-
+
+ DummyTransport( ResponseBuilder* responseBuilder ,
+ bool own = false,
+ bool useDefOutgoing = false );
+
+ virtual ~DummyTransport();
+
void setResponseBuilder( ResponseBuilder* responseBuilder ){
this->responseBuilder = responseBuilder;
}
-
- unsigned int getNextCommandId() throw (exceptions::ActiveMQException){
-
- try{
- synchronized( &commandIdMutex ){
- return ++nextCommandId;
- }
-
- // Should never get here, but some compilers aren't
- // smart enough to figure out we'll never get here.
- return 0;
- }
- AMQ_CATCHALL_THROW( transport::CommandIOException )
- }
-
- virtual void oneway( Command* command )
- throw(CommandIOException, exceptions::UnsupportedOperationException)
- {
- if( outgoingCommandListener != NULL ){
-
- command->setCommandId( getNextCommandId() );
- command->setResponseRequired( false );
- outgoingCommandListener->onCommand( command );
- return;
- }
- }
-
- virtual Response* request( Command* command )
- throw(CommandIOException,
- exceptions::UnsupportedOperationException)
- {
- if( responseBuilder != NULL ){
- command->setCommandId( getNextCommandId() );
- command->setResponseRequired( true );
- return responseBuilder->buildResponse( command );
- }
-
- throw CommandIOException( __FILE__, __LINE__,
- "no response builder available" );
- }
-
+
+ unsigned int getNextCommandId() throw (exceptions::ActiveMQException);
+
+ virtual void oneway( Command* command )
+ throw(CommandIOException, exceptions::UnsupportedOperationException);
+
+ virtual Response* request( Command* command )
+ throw(CommandIOException,
+ exceptions::UnsupportedOperationException);
+
virtual void setCommandListener( CommandListener* listener ){
this->commandListener = listener;
}
-
+
virtual void setOutgoingCommandListener( CommandListener* listener ){
outgoingCommandListener = listener;
}
-
+
virtual void setCommandReader( CommandReader* reader AMQCPP_UNUSED){}
-
+
virtual void setCommandWriter( CommandWriter* writer AMQCPP_UNUSED){}
-
- virtual void setTransportExceptionListener(
+
+ virtual void setTransportExceptionListener(
TransportExceptionListener* listener )
{
this->exceptionListener = listener;
}
-
+
virtual void fireCommand( Command* cmd ){
if( commandListener != NULL ){
commandListener->onCommand( cmd );
}
}
-
+
virtual void fireException( const exceptions::ActiveMQException& ex ){
if( exceptionListener != NULL ){
exceptionListener->onTransportException( this, ex );
}
}
-
+
virtual void start() throw (cms::CMSException){}
virtual void close() throw (cms::CMSException){}
};
-
+
}}
#endif /*ACTIVEMQ_TANSPORT_DUMMYTRANSPORT_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -14,14 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "DummyTransportFactory.h"
#include <activemq/support/LibraryInit.h>
+#include <activemq/connector/stomp/StompResponseBuilder.h>
+#include <activemq/transport/DummyTransport.h>
using namespace activemq;
using namespace activemq::transport;
+using namespace activemq::util;
////////////////////////////////////////////////////////////////////////////////
-//TransportFactoryMapRegistrar DummyTransportFactory::registrar(
-// "dummy", new DummyTransportFactory());
-//
+Transport* DummyTransportFactory::createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next,
+ bool own ) throw ( exceptions::ActiveMQException )
+{
+ // We don't use the next here, so clean it up now.
+ if( own == true ) {
+ delete next;
+ }
+
+ std::string wireFormat =
+ properties.getProperty( "wireFormat", "stomp" );
+
+ DummyTransport::ResponseBuilder* builder = NULL;
+
+ if( wireFormat == "stomp" )
+ {
+ builder = new connector::stomp::StompResponseBuilder(
+ properties.getProperty(
+ "transport.sessionId", "testSessionId" ) );
+ }
+
+ return new DummyTransport( builder, true, true );
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/DummyTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -18,52 +18,32 @@
#ifndef ACTIVEMQ_TRANSPORT_DUMMYTRANSPORTFACTORY_H_
#define ACTIVEMQ_TRANSPORT_DUMMYTRANSPORTFACTORY_H_
-#include <activemq/transport/DummyTransport.h>
#include <activemq/transport/TransportFactory.h>
-#include <activemq/transport/TransportFactoryMapRegistrar.h>
-#include <activemq/connector/stomp/StompResponseBuilder.h>
#include <activemq/support/LibraryInit.h>
namespace activemq{
namespace transport{
-
+
/**
* Manufactures DummyTransports, which are objects that
* read from input streams and write to output streams.
*/
class DummyTransportFactory : public TransportFactory{
- private:
-
- // static TransportFactoryMapRegistrar registrar;
-
public:
-
+
virtual ~DummyTransportFactory(){}
-
+
/**
- * Creates a Transport instance.
+ * Creates a Transport instance.
* @param properties The properties for the transport.
*/
- virtual Transport* createTransport(
- const activemq::util::Properties& properties )
- {
- std::string wireFormat =
- properties.getProperty( "wireFormat", "stomp" );
-
- DummyTransport::ResponseBuilder* builder = NULL;
-
- if( wireFormat == "stomp" )
- {
- builder = new connector::stomp::StompResponseBuilder(
- properties.getProperty(
- "transport.sessionId", "testSessionId" ) );
- }
-
- return new DummyTransport( builder, true );
- }
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next = NULL,
+ bool own = true ) throw ( exceptions::ActiveMQException );
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_DUMMYTRANSPORTFACTORY_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.h Thu Apr 12 13:29:39 2007
@@ -47,27 +47,32 @@
CPPUNIT_TEST_SUITE_END();
public:
-
+
class MyCommand : public Command{
public:
MyCommand(){ c = 0; }
virtual ~MyCommand(){}
-
+
char c;
-
+
virtual void setCommandId( int id AMQCPP_UNUSED){}
virtual int getCommandId() const{ return 0; }
-
+
virtual void setResponseRequired( const bool required AMQCPP_UNUSED){}
virtual bool isResponseRequired() const{ return false; }
virtual std::string toString() const{ return ""; }
+ virtual Command* cloneCommand() const{
+ MyCommand* command = new MyCommand;
+ command->c = c;
+ return command;
+ }
};
-
+
class MyCommandListener : public CommandListener{
public:
MyCommandListener(){}
virtual ~MyCommandListener(){}
-
+
std::string str;
virtual void onCommand( Command* command ){
const MyCommand* cmd = dynamic_cast<const MyCommand*>(command);
@@ -75,21 +80,21 @@
delete command;
}
};
-
+
class MyCommandReader : public CommandReader{
private:
-
+
/**
* The target input stream.
*/
io::InputStream* inputStream;
-
+
public:
MyCommandReader(){ throwException = false; }
virtual ~MyCommandReader(){}
-
+
bool throwException;
-
+
virtual void setInputStream(io::InputStream* is){
inputStream = is;
}
@@ -99,28 +104,28 @@
}
virtual Command* readCommand( void ) throw (CommandIOException){
-
+
try{
if( throwException ){
throw CommandIOException();
}
-
+
synchronized( inputStream ){
MyCommand* command = new MyCommand();
try{
command->c = inputStream->read();
} catch( exceptions::ActiveMQException& ex ){
-
+
// Free the memory.
delete command;
-
+
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
-
+
return command;
}
-
+
assert(false);
return NULL;
}catch( exceptions::ActiveMQException& ex ){
@@ -130,45 +135,45 @@
}
}
- virtual std::size_t read(unsigned char* buffer AMQCPP_UNUSED,
- std::size_t count AMQCPP_UNUSED)
+ virtual std::size_t read(unsigned char* buffer AMQCPP_UNUSED,
+ std::size_t count AMQCPP_UNUSED)
throw( io::IOException ) {
return 0;
}
-
+
virtual unsigned char readByte() throw(io::IOException) {
return 0;
}
};
-
+
class MyCommandWriter : public CommandWriter{
private:
-
+
/**
* Target output stream.
*/
io::OutputStream* outputStream;
-
+
public:
virtual ~MyCommandWriter(){}
virtual void setOutputStream(io::OutputStream* os){
outputStream = os;
}
-
+
virtual io::OutputStream* getOutputStream(void){
return outputStream;
}
- virtual void writeCommand( Command* command )
+ virtual void writeCommand( Command* command )
throw (CommandIOException)
{
try{
synchronized( outputStream ){
-
- const MyCommand* m =
+
+ const MyCommand* m =
dynamic_cast<const MyCommand*>(command);
- outputStream->write( m->c );
+ outputStream->write( m->c );
}
}catch( exceptions::ActiveMQException& ex ){
CommandIOException cx( ex );
@@ -177,25 +182,25 @@
}
}
- virtual void write( const unsigned char* buffer AMQCPP_UNUSED,
- std::size_t count AMQCPP_UNUSED)
+ virtual void write( const unsigned char* buffer AMQCPP_UNUSED,
+ std::size_t count AMQCPP_UNUSED)
throw(io::IOException) {}
-
+
virtual void writeByte(unsigned char v AMQCPP_UNUSED) throw(io::IOException) {}
};
-
+
class MyExceptionListener : public TransportExceptionListener{
public:
-
+
Transport* transport;
concurrent::Mutex mutex;
-
+
MyExceptionListener(){
transport = NULL;
}
virtual ~MyExceptionListener(){}
-
- virtual void onTransportException( Transport* source,
+
+ virtual void onTransportException( Transport* source,
const exceptions::ActiveMQException& ex AMQCPP_UNUSED){
transport = source;
@@ -204,16 +209,16 @@
mutex.notify();
}
}
- };
-
+ };
+
public:
virtual ~IOTransportTest(){}
-
- // This will just test that we can start and stop the
+
+ // This will just test that we can start and stop the
// transport without any exceptions.
void testStartClose(){
-
+
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
@@ -227,16 +232,16 @@
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
-
+
transport.start();
-
+
concurrent::Thread::sleep( 50 );
-
+
transport.close();
}
-
+
void testRead(){
-
+
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
@@ -250,11 +255,11 @@
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
-
+
transport.start();
-
+
concurrent::Thread::sleep( 10 );
-
+
unsigned char buffer[10] = { '1', '2', '3', '4', '5', '6', '7', '8', '9', '0' };
try{
synchronized( &is ){
@@ -263,16 +268,16 @@
}catch( exceptions::ActiveMQException& ex ){
ex.setMark( __FILE__, __LINE__ );
}
-
+
concurrent::Thread::sleep( 100 );
-
+
CPPUNIT_ASSERT( listener.str == "1234567890" );
-
+
transport.close();
}
-
+
void testWrite(){
-
+
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
@@ -286,9 +291,9 @@
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
-
+
transport.start();
-
+
MyCommand cmd;
cmd.c = '1';
transport.oneway( &cmd );
@@ -300,7 +305,7 @@
transport.oneway( &cmd );
cmd.c = '5';
transport.oneway( &cmd );
-
+
const unsigned char* bytes = os.getByteArray();
std::size_t size = os.getByteArraySize();
CPPUNIT_ASSERT( size >= 5 );
@@ -309,12 +314,12 @@
CPPUNIT_ASSERT( bytes[2] == '3' );
CPPUNIT_ASSERT( bytes[3] == '4' );
CPPUNIT_ASSERT( bytes[4] == '5' );
-
+
transport.close();
}
-
+
void testException(){
-
+
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
@@ -329,18 +334,18 @@
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
-
+
unsigned char buffer[1] = { '1' };
try{
- synchronized( &is ){
+ synchronized( &is ){
is.setByteArray( buffer, 1);
}
}catch( exceptions::ActiveMQException& ex ){
ex.setMark(__FILE__, __LINE__ );
}
-
+
transport.start();
-
+
synchronized(&exListener.mutex)
{
if(exListener.transport != &transport)
@@ -348,13 +353,13 @@
exListener.mutex.wait(1000);
}
}
-
+
CPPUNIT_ASSERT( exListener.transport == &transport );
-
+
transport.close();
}
};
-
+
}}
#endif /*ACTIVEMQ_COMMANDS_IOTRANSPORTTEST_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapRegistrarTest.h Thu Apr 12 13:29:39 2007
@@ -27,37 +27,41 @@
namespace activemq{
namespace transport{
-
- class TransportFactoryMapRegistrarTest : public CppUnit::TestFixture {
-
- CPPUNIT_TEST_SUITE( TransportFactoryMapRegistrarTest );
- CPPUNIT_TEST( test );
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- class TestTransportFactory : public TransportFactory
- {
- public:
-
- virtual Transport* createTransport(
- const activemq::util::Properties& properties AMQCPP_UNUSED ) { return NULL; };
- };
-
+
+ class TransportFactoryMapRegistrarTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( TransportFactoryMapRegistrarTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ class TestTransportFactory : public TransportFactory
+ {
+ public:
+
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties AMQCPP_UNUSED,
+ Transport* next = NULL,
+ bool own = true ) throw ( exceptions::ActiveMQException ) {
+ return NULL;
+ };
+ };
+
virtual ~TransportFactoryMapRegistrarTest(){}
-
- void test(){
-
- {
- TransportFactoryMapRegistrar registrar("Test", new TestTransportFactory());
-
- CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup("Test") != NULL);
- }
-
- CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup( "Test" ) == NULL );
- }
- };
-
+
+ void test(){
+
+ {
+ TransportFactoryMapRegistrar registrar("Test", new TestTransportFactory());
+
+ CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup("Test") != NULL);
+ }
+
+ CPPUNIT_ASSERT( TransportFactoryMap::getInstance().lookup( "Test" ) == NULL );
+ }
+ };
+
}}
#endif /*ACTIVEMQ_TRANSPORT_CONNECTORFACTORYMAPREGISTRARTEST_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/TransportFactoryMapTest.h Thu Apr 12 13:29:39 2007
@@ -27,38 +27,43 @@
namespace activemq{
namespace transport{
-
- class TransportFactoryMapTest : public CppUnit::TestFixture {
-
- CPPUNIT_TEST_SUITE( TransportFactoryMapTest );
- CPPUNIT_TEST( test );
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- class TestTransportFactory : public TransportFactory
- {
- public:
-
- virtual Transport* createTransport(
- const activemq::util::Properties& properties AMQCPP_UNUSED) { return NULL; };
- };
-
+
+ class TransportFactoryMapTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( TransportFactoryMapTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ class TestTransportFactory : public TransportFactory
+ {
+ public:
+
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties AMQCPP_UNUSED,
+ Transport* next = NULL,
+ bool own = true ) throw ( exceptions::ActiveMQException ) {
+
+ return NULL;
+ };
+ };
+
virtual ~TransportFactoryMapTest(){}
-
- void test(){
-
- TransportFactoryMap& factMap =
+
+ void test(){
+
+ TransportFactoryMap& factMap =
TransportFactoryMap::getInstance();
- TestTransportFactory testFactory;
-
- factMap.registerTransportFactory( "test", &testFactory );
-
- CPPUNIT_ASSERT( factMap.lookup( "test" ) == &testFactory );
-
- std::vector<std::string> names;
- CPPUNIT_ASSERT( factMap.getFactoryNames( names ) >= 1 );
-
+ TestTransportFactory testFactory;
+
+ factMap.registerTransportFactory( "test", &testFactory );
+
+ CPPUNIT_ASSERT( factMap.lookup( "test" ) == &testFactory );
+
+ std::vector<std::string> names;
+ CPPUNIT_ASSERT( factMap.getFactoryNames( names ) >= 1 );
+
bool found = false;
for( unsigned int i = 0; i < names.size(); ++i )
{
@@ -68,13 +73,13 @@
break;
}
}
- CPPUNIT_ASSERT( found );
-
- factMap.unregisterTransportFactory( "test" );
- CPPUNIT_ASSERT( factMap.lookup( "test" ) == NULL );
- }
- };
-
+ CPPUNIT_ASSERT( found );
+
+ factMap.unregisterTransportFactory( "test" );
+ CPPUNIT_ASSERT( factMap.lookup( "test" ) == NULL );
+ }
+ };
+
}}
#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFACTORYMAPTEST_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ResponseCorrelatorTest.h"
+
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::filters::ResponseCorrelatorTest );
+
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/filters/ResponseCorrelatorTest.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORTEST_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <activemq/transport/filters/ResponseCorrelator.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
+#include <activemq/util/Config.h>
+#include <queue>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ class ResponseCorrelatorTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( ResponseCorrelatorTest );
+ CPPUNIT_TEST( testBasics );
+ CPPUNIT_TEST( testOneway );
+ CPPUNIT_TEST( testTransportException );
+ CPPUNIT_TEST( testMultiRequests );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ class MyCommand : public Command{
+ private:
+
+ unsigned int commandId;
+ bool responseRequired;
+
+ public:
+
+ virtual void setCommandId( int id ){
+ commandId = id;
+ }
+ virtual int getCommandId() const{
+ return commandId;
+ }
+
+ virtual void setResponseRequired( const bool required ){
+ responseRequired = required;
+ }
+ virtual bool isResponseRequired() const{
+ return responseRequired;
+ }
+
+ virtual std::string toString() const{ return ""; }
+
+ virtual Command* cloneCommand() const{
+ MyCommand* command = new MyCommand;
+ command->commandId = commandId;
+ command->responseRequired = responseRequired;
+ return command;
+ }
+ };
+
+ class MyResponse : public Response{
+ private:
+
+ unsigned int commandId;
+ bool responseRequired;
+ unsigned int corrId;
+
+ public:
+
+ virtual void setCommandId( int id ){
+ commandId = id;
+ }
+ virtual int getCommandId() const{
+ return commandId;
+ }
+
+ virtual void setResponseRequired( const bool required ){
+ responseRequired = required;
+ }
+ virtual bool isResponseRequired() const{
+ return responseRequired;
+ }
+
+ virtual int getCorrelationId() const{
+ return corrId;
+ }
+ virtual void setCorrelationId( int corrId ){
+ this->corrId = corrId;
+ }
+
+ virtual std::string toString() const{ return ""; }
+
+ virtual Command* cloneCommand() const{
+ MyResponse* command = new MyResponse;
+ command->commandId = commandId;
+ command->responseRequired = responseRequired;
+ command->corrId = corrId;
+ return command;
+ }
+ };
+
+ class MyTransport
+ :
+ public Transport,
+ public concurrent::Runnable{
+ public:
+ CommandReader* reader;
+ CommandWriter* writer;
+ CommandListener* listener;
+ TransportExceptionListener* exListener;
+ concurrent::Thread* thread;
+ concurrent::Mutex mutex;
+ concurrent::Mutex startedMutex;
+ bool done;
+ std::queue<Command*> requests;
+
+ public:
+
+ MyTransport(){
+ reader = NULL;
+ writer = NULL;
+ listener = NULL;
+ exListener = NULL;
+ thread = NULL;
+ done = false;
+ }
+
+ virtual ~MyTransport(){
+
+ close();
+ }
+
+ virtual void oneway( Command* command )
+ throw(CommandIOException, exceptions::UnsupportedOperationException)
+ {
+ synchronized( &mutex ){
+ requests.push( command );
+ mutex.notifyAll();
+ }
+ }
+
+ virtual Response* request( Command* command AMQCPP_UNUSED)
+ throw(CommandIOException, exceptions::UnsupportedOperationException)
+ {
+ throw exceptions::UnsupportedOperationException(
+ __FILE__,
+ __LINE__,
+ "stuff" );
+ }
+
+ virtual void setCommandListener( CommandListener* listener ){
+ this->listener = listener;
+ }
+
+ virtual void setCommandReader( CommandReader* reader ){
+ this->reader = reader;
+ }
+
+ virtual void setCommandWriter( CommandWriter* writer ){
+ this->writer = writer;
+ }
+
+ virtual void setTransportExceptionListener(
+ TransportExceptionListener* listener )
+ {
+ this->exListener = listener;
+ }
+
+ virtual void start() throw( cms::CMSException ){
+ close();
+
+ done = false;
+
+ thread = new concurrent::Thread( this );
+ thread->start();
+ }
+
+ virtual void close() throw( cms::CMSException ){
+
+ done = true;
+
+ if( thread != NULL ){
+ synchronized( &mutex ){
+ mutex.notifyAll();
+ }
+ thread->join();
+ delete thread;
+ thread = NULL;
+ }
+ }
+
+ virtual Response* createResponse( Command* command ){
+
+ MyResponse* resp = new MyResponse();
+ resp->setCorrelationId( command->getCommandId() );
+ resp->setResponseRequired( false );
+ return resp;
+ }
+
+ virtual void run(){
+
+ try{
+
+ synchronized(&startedMutex)
+ {
+ startedMutex.notifyAll();
+ }
+
+ synchronized( &mutex ){
+
+ while( !done ){
+
+ if( requests.empty() ){
+ mutex.wait();
+ }else{
+
+ Command* cmd = requests.front();
+ requests.pop();
+
+ // Only send a response if one is required.
+ Response* resp = NULL;
+ if( cmd->isResponseRequired() ){
+ resp = createResponse( cmd );
+ }
+
+ mutex.unlock();
+
+ // Send both the response and the original
+ // command back to the correlator.
+ if( listener != NULL ){
+ if( resp != NULL ){
+ listener->onCommand( resp );
+ }
+ listener->onCommand( cmd );
+ }
+
+ mutex.lock();
+ }
+ }
+ }
+ }catch( exceptions::ActiveMQException& ex ){
+ if( exListener ){
+ exListener->onTransportException( this, ex );
+ }
+ }
+ catch( ... ){
+ if( exListener ){
+ exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
+ exListener->onTransportException( this, ex );
+ }
+ }
+ }
+ };
+
+ class MyBrokenTransport : public MyTransport{
+ public:
+
+ MyBrokenTransport(){}
+ virtual ~MyBrokenTransport(){}
+
+ virtual Response* createResponse( Command* command AMQCPP_UNUSED){
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "bad stuff" );
+ }
+ };
+
+ class MyListener
+ :
+ public CommandListener,
+ public TransportExceptionListener{
+
+ public:
+
+ int exCount;
+ std::set<int> commands;
+ concurrent::Mutex mutex;
+
+ public:
+
+ MyListener(){
+ exCount = 0;
+ }
+ virtual ~MyListener(){}
+ virtual void onCommand( Command* command ){
+
+ synchronized( &mutex ){
+ commands.insert( command->getCommandId() );
+
+ mutex.notify();
+ }
+ }
+
+ virtual void onTransportException(
+ Transport* source AMQCPP_UNUSED,
+ const exceptions::ActiveMQException& ex AMQCPP_UNUSED)
+ {
+ synchronized( &mutex ){
+ exCount++;
+ }
+ }
+ };
+
+ class RequestThread : public concurrent::Thread{
+ public:
+
+ Transport* transport;
+ MyCommand cmd;
+ Response* resp;
+ public:
+
+ RequestThread(){
+ transport = NULL;
+ resp = NULL;
+ }
+ virtual ~RequestThread(){
+ join();
+
+ if( resp != NULL ){
+ delete resp;
+ resp = NULL;
+ }
+ }
+
+ void setTransport( Transport* transport ){
+ this->transport = transport;
+ }
+
+ void run(){
+
+ try{
+ resp = transport->request(&cmd);
+ }catch( ... ){
+ CPPUNIT_ASSERT( false );
+ }
+ }
+ };
+
+ public:
+
+ virtual ~ResponseCorrelatorTest(){}
+
+ void testBasics(){
+
+ try{
+
+ MyListener listener;
+ MyTransport transport;
+ ResponseCorrelator correlator( &transport, false );
+ correlator.setCommandListener( &listener );
+ correlator.setTransportExceptionListener( &listener );
+ CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+ // Give the thread a little time to get up and running.
+ synchronized(&transport.startedMutex)
+ {
+ // Start the transport.
+ correlator.start();
+ transport.startedMutex.wait();
+ }
+
+ // Send one request.
+ MyCommand cmd;
+ Response* resp = correlator.request( &cmd );
+ CPPUNIT_ASSERT( resp != NULL );
+ CPPUNIT_ASSERT( resp->getCorrelationId() == cmd.getCommandId() );
+
+ // Wait to get the message back asynchronously.
+ concurrent::Thread::sleep( 100 );
+
+ // Since our transport relays our original command back at us as a
+ // non-response message, check to make sure we received it and that
+ // it is the original command.
+ CPPUNIT_ASSERT( listener.commands.size() == 1 );
+ CPPUNIT_ASSERT( listener.exCount == 0 );
+
+ correlator.close();
+
+ // Destroy the response.
+ delete resp;
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+ }
+
+ void testOneway(){
+
+ try{
+
+ MyListener listener;
+ MyTransport transport;
+ ResponseCorrelator correlator( &transport, false );
+ correlator.setCommandListener( &listener );
+ correlator.setTransportExceptionListener( &listener );
+ CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+ // Give the thread a little time to get up and running.
+ synchronized(&transport.startedMutex)
+ {
+ // Start the transport.
+ correlator.start();
+
+ transport.startedMutex.wait();
+ }
+
+ // Send many oneway request (we'll get them back asynchronously).
+ const unsigned int numCommands = 1000;
+ MyCommand commands[numCommands];
+ for( unsigned int ix=0; ix<numCommands; ix++ ){
+ correlator.oneway( &commands[ix] );
+ }
+
+ // Give the thread a little time to get all the messages back.
+ concurrent::Thread::sleep( 500 );
+
+ // Make sure we got them all back.
+ CPPUNIT_ASSERT( listener.commands.size() == numCommands );
+ CPPUNIT_ASSERT( listener.exCount == 0 );
+
+ correlator.close();
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+ }
+
+ void testTransportException(){
+
+ try{
+
+ MyListener listener;
+ MyBrokenTransport transport;
+ ResponseCorrelator correlator( &transport, false );
+ correlator.setCommandListener( &listener );
+ correlator.setTransportExceptionListener( &listener );
+ CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+ // Give the thread a little time to get up and running.
+ synchronized(&transport.startedMutex)
+ {
+ // Start the transport.
+ correlator.start();
+
+ transport.startedMutex.wait();
+ }
+
+ // Send one request.
+ MyCommand cmd;
+ try{
+ correlator.request( &cmd );
+ CPPUNIT_ASSERT(false);
+ }catch( CommandIOException& ex ){
+ // Expected.
+ }
+
+ // Wait to make sure we get the asynchronous message back.
+ concurrent::Thread::sleep( 200 );
+
+ // Since our transport relays our original command back at us as a
+ // non-response message, check to make sure we received it and that
+ // it is the original command.
+ CPPUNIT_ASSERT( listener.commands.size() == 0 );
+ CPPUNIT_ASSERT( listener.exCount == 1 );
+
+ correlator.close();
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+ }
+
+ void testMultiRequests(){
+
+ try{
+
+ MyListener listener;
+ MyTransport transport;
+ ResponseCorrelator correlator( &transport, false );
+ correlator.setCommandListener( &listener );
+ correlator.setTransportExceptionListener( &listener );
+ CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport.exListener == &correlator );
+
+ // Start the transport.
+ correlator.start();
+
+ // Make sure the start command got down to the thread.
+ CPPUNIT_ASSERT( transport.thread != NULL );
+
+ // Give the thread a little time to get up and running.
+ synchronized(&transport.startedMutex)
+ {
+ transport.startedMutex.wait(500);
+ }
+
+ // Start all the requester threads.
+ const unsigned int numRequests = 100;
+ RequestThread requesters[numRequests];
+ for( unsigned int ix=0; ix<numRequests; ++ix ){
+ requesters[ix].setTransport( &correlator );
+ requesters[ix].start();
+ }
+
+ // Make sure we got all the responses and that they were all
+ // what we expected.
+ for( unsigned int ix=0; ix<numRequests; ++ix ){
+ requesters[ix].join();
+ CPPUNIT_ASSERT( requesters[ix].resp != NULL );
+ CPPUNIT_ASSERT( requesters[ix].cmd.getCommandId() == requesters[ix].resp->getCorrelationId() );
+ }
+
+ concurrent::Thread::sleep( 60 );
+ synchronized( &listener.mutex )
+ {
+ unsigned int count = 0;
+
+ while( listener.commands.size() != numRequests )
+ {
+ listener.mutex.wait( 75 );
+
+ ++count;
+
+ if( count == numRequests ) {
+ break;
+ }
+ }
+ }
+
+ // Since our transport relays our original command back at us as a
+ // non-response message, check to make sure we received it and that
+ // it is the original command.
+ CPPUNIT_ASSERT( listener.commands.size() == numRequests );
+ CPPUNIT_ASSERT( listener.exCount == 0 );
+
+ correlator.close();
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+ }
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORTEST_H_*/