You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/02/19 02:06:08 UTC
svn commit: r745701 [3/3] - in /activemq/activemq-cpp/trunk/src: main/
main/activemq/core/ main/activemq/state/ main/activemq/transport/
main/activemq/transport/correlator/ main/activemq/transport/failover/
main/activemq/transport/logging/ main/activem...
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h Thu Feb 19 01:06:07 2009
@@ -26,6 +26,8 @@
namespace transport{
namespace tcp{
+ using decaf::lang::Pointer;
+
/**
* Factory Responsible for creating the TcpTransport.
*/
@@ -39,14 +41,17 @@
/**
* Creates a slimed down Transport instance which can be used in composite
* transport instances.
+ *
* @param location - URI location to connect to.
* @param wireformat - the assigned WireFormat for the new Transport.
* @param properties - Properties to apply to the transport.
+ *
+ * @return new Pointer to a TcpTransport.
* @throws ActiveMQexception if an error occurs
*/
- virtual Transport* doCreateComposite( const decaf::net::URI& location,
- wireformat::WireFormat* wireFormat,
- const decaf::util::Properties& properties )
+ virtual Pointer<Transport> doCreateComposite( const decaf::net::URI& location,
+ const Pointer<wireformat::WireFormat>& wireFormat,
+ const decaf::util::Properties& properties )
throw ( exceptions::ActiveMQException );
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h Thu Feb 19 01:06:07 2009
@@ -23,6 +23,7 @@
#include <decaf/io/DataInputStream.h>
#include <decaf/io/DataOutputStream.h>
#include <decaf/io/IOException.h>
+#include <decaf/lang/Pointer.h>
#include <activemq/util/Config.h>
#include <activemq/commands/Command.h>
@@ -33,6 +34,8 @@
namespace activemq{
namespace wireformat{
+ using decaf::lang::Pointer;
+
/**
* Provides a mechanism to marshal commands into and out of packets
* or into and out of streams, Channels and Datagrams.
@@ -50,7 +53,8 @@
* @param out - the output stream to write the command to.
* @throws IOException
*/
- virtual void marshal( commands::Command* command, decaf::io::DataOutputStream* out )
+ virtual void marshal( const Pointer<commands::Command>& command,
+ decaf::io::DataOutputStream* out )
throw ( decaf::io::IOException ) = 0;
/**
@@ -59,7 +63,7 @@
* @returns the newly marshaled Command, caller owns the pointer
* @throws IOException
*/
- virtual commands::Command* unmarshal( decaf::io::DataInputStream* in )
+ virtual Pointer<commands::Command> unmarshal( decaf::io::DataInputStream* in )
throw ( decaf::io::IOException ) = 0;
/**
@@ -84,11 +88,12 @@
/**
* If the Transport Provides a Negotiator this method will create and return
* a news instance of the Negotiator.
- * @returns new instance of a WireFormatNegotiator.
+ * @returns new instance of a WireFormatNegotiator as a Pointer<Transport>.
* @throws UnsupportedOperationException if the WireFormat doesn't have a Negotiator.
*/
- virtual WireFormatNegotiator* createNegotiator( transport::Transport* transport )
- throw( decaf::lang::exceptions::UnsupportedOperationException ) = 0;
+ virtual Pointer<transport::Transport> createNegotiator(
+ const Pointer<transport::Transport>& transport )
+ throw( decaf::lang::exceptions::UnsupportedOperationException ) = 0;
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h Thu Feb 19 01:06:07 2009
@@ -21,11 +21,14 @@
#include <activemq/util/Config.h>
#include <activemq/wireformat/WireFormat.h>
#include <decaf/util/Properties.h>
+#include <decaf/lang/Pointer.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
namespace activemq{
namespace wireformat{
+ using decaf::lang::Pointer;
+
/**
* The WireFormatFactory is the interface that all WireFormatFactory
* classes must extend. The Factory creates a WireFormat Object based on
@@ -40,9 +43,11 @@
/**
* Creates a new WireFormat Object passing it a set of
* properties from which it can obtain any optional settings
+ *
* @param properties - the Properties for this WireFormat
+ * @return Pointer to a new instance of a WireFormat object.
*/
- virtual WireFormat* createWireFormat( const decaf::util::Properties& properties )
+ virtual Pointer<WireFormat> createWireFormat( const decaf::util::Properties& properties )
throw ( decaf::lang::exceptions::IllegalStateException ) = 0;
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h Thu Feb 19 01:06:07 2009
@@ -20,10 +20,13 @@
#include <activemq/util/Config.h>
#include <activemq/transport/TransportFilter.h>
+#include <decaf/lang/Pointer.h>
namespace activemq {
namespace wireformat {
+ using decaf::lang::Pointer;
+
/**
* Defines a WireFormatNegotiator which allows a WireFormat to
*/
@@ -33,10 +36,9 @@
/**
* Constructor.
* @param next - the next Transport in the chain
- * @param own - true if this filter owns the next and should delete it
*/
- WireFormatNegotiator( Transport* next, const bool own = true ) :
- transport::TransportFilter( next, own ) {}
+ WireFormatNegotiator( const Pointer<transport::Transport>& next ) :
+ transport::TransportFilter( next ) {}
virtual ~WireFormatNegotiator() {}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp Thu Feb 19 01:06:07 2009
@@ -86,11 +86,11 @@
}
////////////////////////////////////////////////////////////////////////////////
-WireFormatNegotiator* OpenWireFormat::createNegotiator( transport::Transport* transport )
+Pointer<Transport> OpenWireFormat::createNegotiator( const Pointer<Transport>& transport )
throw( decaf::lang::exceptions::UnsupportedOperationException ) {
try{
- return new OpenWireFormatNegotiator( this, transport, true );
+ return Pointer<Transport>( new OpenWireFormatNegotiator( this, transport ) );
}
AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCHALL_THROW( UnsupportedOperationException )
@@ -149,13 +149,13 @@
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::setPreferedWireFormatInfo(
- commands::WireFormatInfo* info ) throw ( IllegalStateException ) {
+ const Pointer<commands::WireFormatInfo>& info ) throw ( IllegalStateException ) {
- this->preferedWireFormatInfo.reset( info );
+ this->preferedWireFormatInfo = info;
}
////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormat::marshal( commands::Command* command,
+void OpenWireFormat::marshal( const Pointer<commands::Command>& command,
decaf::io::DataOutputStream* dataOut )
throw ( decaf::io::IOException ) {
@@ -166,7 +166,7 @@
if( command != NULL ) {
DataStructure* dataStructure =
- dynamic_cast< DataStructure* >( command );
+ dynamic_cast< DataStructure* >( command.get() );
unsigned char type = dataStructure->getDataStructureType();
@@ -226,7 +226,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Command* OpenWireFormat::unmarshal( decaf::io::DataInputStream* dis )
+Pointer<commands::Command> OpenWireFormat::unmarshal( decaf::io::DataInputStream* dis )
throw ( decaf::io::IOException ) {
try {
@@ -236,7 +236,7 @@
}
// Get the unmarshalled DataStructure
- DataStructure* data = doUnmarshal( dis );
+ Pointer<DataStructure> data( doUnmarshal( dis ) );
if( data == NULL ) {
throw IOException(
@@ -246,19 +246,10 @@
}
// Now all unmarshals from this level should result in an object
- // that is a commands::Command type, if its not then we throw an
- // exception.
- commands::Command* command =
- dynamic_cast< commands::Command* >( data );
-
- if( command == NULL ) {
- delete data;
-
- throw IOException(
- __FILE__, __LINE__,
- "OpenWireFormat::doUnmarshal - "
- "Unmarshalled a non Command Type" );
- }
+ // that is a commands::Command type, if its not then the cast will
+ // throw an ClassCastException.
+ Pointer<Command> command =
+ data.dynamicCast<Command, Pointer<Command>::CounterType>();
return command;
}
@@ -513,10 +504,10 @@
}
////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormat::renegotiateWireFormat( WireFormatInfo* info )
+void OpenWireFormat::renegotiateWireFormat( const WireFormatInfo& info )
throw ( IllegalStateException ) {
- if( preferedWireFormatInfo.get() == NULL ) {
+ if( preferedWireFormatInfo == NULL ) {
throw IllegalStateException(
__FILE__, __LINE__,
"OpenWireFormat::renegotiateWireFormat - "
@@ -524,15 +515,15 @@
}
this->setVersion( Math::min( preferedWireFormatInfo->getVersion(),
- info->getVersion() ) );
- this->stackTraceEnabled = info->isStackTraceEnabled() &&
+ info.getVersion() ) );
+ this->stackTraceEnabled = info.isStackTraceEnabled() &&
preferedWireFormatInfo->isStackTraceEnabled();
- this->tcpNoDelayEnabled = info->isTcpNoDelayEnabled() &&
+ this->tcpNoDelayEnabled = info.isTcpNoDelayEnabled() &&
preferedWireFormatInfo->isTcpNoDelayEnabled();
- this->cacheEnabled = info->isCacheEnabled() &&
+ this->cacheEnabled = info.isCacheEnabled() &&
preferedWireFormatInfo->isCacheEnabled();
- this->tightEncodingEnabled = info->isTightEncodingEnabled() &&
+ this->tightEncodingEnabled = info.isTightEncodingEnabled() &&
preferedWireFormatInfo->isTightEncodingEnabled();
- this->sizePrefixDisabled = info->isSizePrefixDisabled() &&
+ this->sizePrefixDisabled = info.isSizePrefixDisabled() &&
preferedWireFormatInfo->isSizePrefixDisabled();
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h Thu Feb 19 01:06:07 2009
@@ -23,6 +23,7 @@
#include <activemq/commands/DataStructure.h>
#include <activemq/wireformat/WireFormat.h>
#include <activemq/wireformat/openwire/utils/BooleanStream.h>
+#include <decaf/lang/Pointer.h>
#include <decaf/util/Properties.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -36,7 +37,39 @@
class DataStreamMarshaller;
}
+ using decaf::lang::Pointer;
+
class AMQCPP_API OpenWireFormat : public wireformat::WireFormat {
+ protected:
+
+ // Declared here to make life easier.
+ static const unsigned char NULL_TYPE;
+
+ // V1 if the default version we start at.
+ static const int DEFAULT_VERSION = 1;
+
+ private:
+
+ // Configuration parameters
+ decaf::util::Properties properties;
+
+ // Preferred WireFormatInfo
+ Pointer<commands::WireFormatInfo> preferedWireFormatInfo;
+
+ // Marshalers
+ std::vector< marshal::DataStreamMarshaller* > dataMarshallers;
+
+ // Uniquely Generated ID, initialize in the Ctor
+ std::string id;
+
+ // WireFormat Data
+ int version;
+ bool stackTraceEnabled;
+ bool tcpNoDelayEnabled;
+ bool cacheEnabled;
+ bool tightEncodingEnabled;
+ bool sizePrefixDisabled;
+
public:
/**
@@ -61,8 +94,9 @@
* a news instance of the Negotiator.
* @returns new instance of a WireFormatNegotiator.
*/
- virtual wireformat::WireFormatNegotiator* createNegotiator( transport::Transport* transport )
- throw( decaf::lang::exceptions::UnsupportedOperationException );
+ virtual Pointer<transport::Transport> createNegotiator(
+ const Pointer<transport::Transport>& transport )
+ throw( decaf::lang::exceptions::UnsupportedOperationException );
/**
* Allows an external source to add marshalers to this object for
@@ -77,7 +111,7 @@
* @param out - the output stream to write the command to.
* @throws IOException
*/
- virtual void marshal( commands::Command* command,
+ virtual void marshal( const Pointer<commands::Command>& command,
decaf::io::DataOutputStream* dataOut )
throw ( decaf::io::IOException );
@@ -87,7 +121,7 @@
* @returns the newly marshaled Command, caller owns the pointer
* @throws IOException
*/
- virtual commands::Command* unmarshal( decaf::io::DataInputStream* dis )
+ virtual Pointer<commands::Command> unmarshal( decaf::io::DataInputStream* dis )
throw ( decaf::io::IOException );
/**
@@ -159,22 +193,22 @@
* @param info - The new Wireformat Info settings
* @throws IllegalStateException is the params can't be negotiated.
*/
- void renegotiateWireFormat( commands::WireFormatInfo* info )
+ void renegotiateWireFormat( const commands::WireFormatInfo& info )
throw ( decaf::lang::exceptions::IllegalStateException );
/**
* Configures this object using the provided WireformatInfo object
* @param info - a WireFormatInfo object, takes ownership.
*/
- virtual void setPreferedWireFormatInfo( commands::WireFormatInfo* info )
+ virtual void setPreferedWireFormatInfo( const Pointer<commands::WireFormatInfo>& info )
throw ( decaf::lang::exceptions::IllegalStateException );
/**
* Gets the Preferred WireFormatInfo object that this class holds
* @return pointer to a preferred WireFormatInfo object
*/
- virtual commands::WireFormatInfo* getPreferedWireFormatInfo() const {
- return this->preferedWireFormatInfo.get();
+ virtual const Pointer<commands::WireFormatInfo>& getPreferedWireFormatInfo() const {
+ return this->preferedWireFormatInfo;
}
/**
@@ -292,36 +326,6 @@
*/
void destroyMarshalers();
- protected:
-
- // Declared here to make life easier.
- static const unsigned char NULL_TYPE;
-
- // V1 if the default version we start at.
- static const int DEFAULT_VERSION = 1;
-
- private:
-
- // This object config data
- decaf::util::Properties properties;
-
- // Preferred WireFormatInfo
- std::auto_ptr<commands::WireFormatInfo> preferedWireFormatInfo;
-
- // Marshalers
- std::vector< marshal::DataStreamMarshaller* > dataMarshallers;
-
- // Uniquely Generated ID, initialize in the Ctor
- std::string id;
-
- // WireFormat Data
- int version;
- bool stackTraceEnabled;
- bool tcpNoDelayEnabled;
- bool cacheEnabled;
- bool tightEncodingEnabled;
- bool sizePrefixDisabled;
-
};
}}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp Thu Feb 19 01:06:07 2009
@@ -20,6 +20,7 @@
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
+#include <decaf/lang/Pointer.h>
using namespace std;
using namespace activemq;
@@ -33,13 +34,13 @@
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-WireFormat* OpenWireFormatFactory::createWireFormat(
+Pointer<WireFormat> OpenWireFormatFactory::createWireFormat(
const decaf::util::Properties& properties )
throw ( decaf::lang::exceptions::IllegalStateException ) {
try{
- WireFormatInfo* info = new WireFormatInfo();
+ Pointer<WireFormatInfo> info( new WireFormatInfo() );
// Configure the version to use
info->setVersion( Integer::parseInt(
@@ -63,12 +64,12 @@
"false" ) ) );
// Create the Openwire Format Object
- OpenWireFormat* f = new OpenWireFormat( properties );
+ Pointer<OpenWireFormat> wireFormat( new OpenWireFormat( properties ) );
// give the format object the ownership
- f->setPreferedWireFormatInfo( info );
+ wireFormat->setPreferedWireFormatInfo( info );
- return f;
+ return wireFormat;
}
AMQ_CATCH_RETHROW( IllegalStateException )
AMQ_CATCHALL_THROW( IllegalStateException )
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h Thu Feb 19 01:06:07 2009
@@ -22,12 +22,15 @@
#include <activemq/wireformat/WireFormatFactory.h>
#include <activemq/commands/WireFormatInfo.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
+#include <decaf/lang/Pointer.h>
#include <decaf/util/Properties.h>
namespace activemq{
namespace wireformat{
namespace openwire{
+ using decaf::lang::Pointer;
+
class AMQCPP_API OpenWireFormatFactory : public wireformat::WireFormatFactory {
public:
@@ -54,7 +57,7 @@
* properties from which it can obtain any optional settings
* @param properties - the Properties for this WireFormat
*/
- virtual wireformat::WireFormat* createWireFormat(
+ virtual Pointer<wireformat::WireFormat> createWireFormat(
const decaf::util::Properties& properties )
throw ( decaf::lang::exceptions::IllegalStateException );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp Thu Feb 19 01:06:07 2009
@@ -23,6 +23,7 @@
using namespace std;
using namespace activemq;
+using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace activemq::wireformat;
using namespace activemq::wireformat::openwire;
@@ -32,15 +33,14 @@
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
- Transport* next,
- bool own ) :
- WireFormatNegotiator( next, own ),
+OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* wireFormat,
+ const Pointer<Transport>& next ) :
+ WireFormatNegotiator( next ),
wireInfoSentDownLatch(1),
readyCountDownLatch(1)
{
this->firstTime.set( true );
- this->openWireFormat = openWireFormat;
+ this->openWireFormat = wireFormat;
this->closed = true;
}
@@ -52,7 +52,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::oneway( commands::Command* command )
+void OpenWireFormatNegotiator::oneway( const Pointer<Command>& command )
throw( CommandIOException, UnsupportedOperationException ) {
try{
@@ -81,7 +81,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Response* OpenWireFormatNegotiator::request( commands::Command* command )
+Pointer<Response> OpenWireFormatNegotiator::request( const Pointer<Command>& command )
throw( CommandIOException, UnsupportedOperationException ) {
try{
@@ -110,8 +110,9 @@
}
////////////////////////////////////////////////////////////////////////////////
-commands::Response* OpenWireFormatNegotiator::request( commands::Command* command, unsigned int timeout )
- throw( CommandIOException, UnsupportedOperationException ) {
+Pointer<Response> OpenWireFormatNegotiator::request(
+ const Pointer<Command>& command, unsigned int timeout )
+ throw( CommandIOException, UnsupportedOperationException ) {
try{
@@ -139,11 +140,11 @@
}
////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::onCommand( commands::Command* command ) {
+void OpenWireFormatNegotiator::onCommand( const Pointer<Command>& command ) {
if( command->isWireFormatInfo() ) {
- WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( command );
+ WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( command.get() );
try {
@@ -156,7 +157,7 @@
}
wireInfoSentDownLatch.await( negotiationTimeout );
- openWireFormat->renegotiateWireFormat( info );
+ openWireFormat->renegotiateWireFormat( *info );
readyCountDownLatch.countDown();
@@ -225,7 +226,7 @@
// the message as it marshaled out to the wire
Transport* transport = this->next->narrow( typeid( transport::IOTransport ) );
if( transport == NULL ) {
- transport = this->next;
+ transport = this->next.get();
}
// We first send the WireFormat that we'd prefer.
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h Thu Feb 19 01:06:07 2009
@@ -26,11 +26,14 @@
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/concurrent/Concurrent.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/lang/Pointer.h>
namespace activemq{
namespace wireformat{
namespace openwire{
+ using decaf::lang::Pointer;
+
class AMQCPP_API OpenWireFormatNegotiator : public wireformat::WireFormatNegotiator {
private:
@@ -64,13 +67,12 @@
/**
* Constructor - Initializes this object around another Transport
- * @param openWireFormat - The WireFormat object we use to negotiate
+ * @param wireFormat - The WireFormat object we use to negotiate
* @param next - The next transport in the chain
* @param own - do we own the Transport pointer.
*/
- OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
- transport::Transport* next,
- bool own = true );
+ OpenWireFormatNegotiator( OpenWireFormat* wireFormat,
+ const Pointer<transport::Transport>& next );
virtual ~OpenWireFormatNegotiator();
@@ -85,7 +87,7 @@
* @throws UnsupportedOperationException if this method is not implemented
* by this transport.
*/
- virtual void oneway( commands::Command* command )
+ virtual void oneway( const Pointer<commands::Command>& command )
throw( transport::CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException );
@@ -97,7 +99,7 @@
* @return the response from the server.
* @throws CommandIOException if an error occurs with the request.
*/
- virtual commands::Response* request( commands::Command* command )
+ virtual Pointer<commands::Response> request( const Pointer<commands::Command>& command )
throw( transport::CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException );
@@ -110,9 +112,10 @@
* @return the response from the server.
* @throws CommandIOException if an error occurs with the request.
*/
- virtual commands::Response* request( commands::Command* command, unsigned int timeout )
- throw( transport::CommandIOException,
- decaf::lang::exceptions::UnsupportedOperationException );
+ virtual Pointer<commands::Response> request(
+ const Pointer<commands::Command>& command, unsigned int timeout )
+ throw( transport::CommandIOException,
+ decaf::lang::exceptions::UnsupportedOperationException );
/**
* This is called in the context of the nested transport's
@@ -122,7 +125,7 @@
* the command listener.
* @param command the received from the nested transport.
*/
- virtual void onCommand( commands::Command* command );
+ virtual void onCommand( const Pointer<commands::Command>& command );
/**
* Event handler for an exception from a command transport.
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp Thu Feb 19 01:06:07 2009
@@ -37,58 +37,43 @@
#include <activemq/commands/WireFormatInfo.h>
using namespace activemq;
+using namespace activemq::commands;
using namespace activemq::wireformat;
using namespace activemq::wireformat::openwire;
using namespace activemq::transport;
using namespace activemq::transport::mock;
+using namespace decaf;
+using namespace decaf::lang;
////////////////////////////////////////////////////////////////////////////////
-commands::Response* OpenWireResponseBuilder::buildResponse(
- const commands::Command* command ){
+Pointer<Response> OpenWireResponseBuilder::buildResponse(
+ const Pointer<Command>& command ){
- if( typeid( *command ) == typeid( commands::ActiveMQBytesMessage ) ||
- typeid( *command ) == typeid( commands::ActiveMQMapMessage ) ||
- typeid( *command ) == typeid( commands::ActiveMQMessage ) ||
- typeid( *command ) == typeid( commands::ActiveMQObjectMessage ) ||
- typeid( *command ) == typeid( commands::ActiveMQStreamMessage ) ||
- typeid( *command ) == typeid( commands::ActiveMQTextMessage ) ||
- typeid( *command ) == typeid( commands::ConnectionInfo ) ||
- typeid( *command ) == typeid( commands::ConsumerInfo ) ||
- typeid( *command ) == typeid( commands::DestinationInfo ) ||
- typeid( *command ) == typeid( commands::ProducerInfo ) ||
- typeid( *command ) == typeid( commands::RemoveSubscriptionInfo ) ||
- typeid( *command ) == typeid( commands::RemoveInfo ) ||
- typeid( *command ) == typeid( commands::SessionInfo ) ) {
+ if( command->isResponseRequired() ) {
// These Commands just require a response that matches their command IDs
- commands::Response* response = new commands::Response();
+ Pointer<Response> response( new commands::Response() );
response->setCorrelationId( command->getCommandId() );
return response;
}
- // If this command requires a response we don't know what it is
- // so we throw an exception.
- if( command->isResponseRequired() ) {
- throw transport::CommandIOException( __FILE__, __LINE__,
- "OpenWireResponseBuilder - unrecognized command" );
- }
-
- return NULL;
+ return Pointer<Response>();
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireResponseBuilder::buildIncomingCommands(
- const commands::Command* command, decaf::util::StlQueue<commands::Command*>& queue ){
+ const Pointer<Command>& command, decaf::util::StlQueue< Pointer<Command> >& queue ){
// Delegate this to buildResponse
if( command->isResponseRequired() ) {
queue.push( buildResponse( command ) );
}
- if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {
+ if( command->isWireFormatInfo() ) {
// Return a copy of the callers own requested WireFormatInfo
// so they get exactly the settings they asked for.
- queue.push( dynamic_cast<commands::Command*>( command->cloneDataStructure() ) );
+ queue.push( Pointer<Command>(
+ dynamic_cast<WireFormatInfo*>( command->cloneDataStructure() ) ) );
}
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h Thu Feb 19 01:06:07 2009
@@ -21,11 +21,14 @@
#include <activemq/util/Config.h>
#include <activemq/transport/mock/MockTransport.h>
#include <decaf/util/StlQueue.h>
+#include <decaf/lang/Pointer.h>
namespace activemq{
namespace wireformat{
namespace openwire{
+ using decaf::lang::Pointer;
+
class AMQCPP_API OpenWireResponseBuilder :
public transport::mock::MockTransport::ResponseBuilder{
public:
@@ -33,11 +36,12 @@
OpenWireResponseBuilder() {}
virtual ~OpenWireResponseBuilder() {}
- virtual commands::Response* buildResponse( const commands::Command* command );
+ virtual Pointer<commands::Response> buildResponse(
+ const Pointer<commands::Command>& command );
virtual void buildIncomingCommands(
- const commands::Command* command,
- decaf::util::StlQueue<commands::Command*>& queue );
+ const Pointer<commands::Command>& command,
+ decaf::util::StlQueue< Pointer<commands::Command> >& queue );
};
Modified: activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp Thu Feb 19 01:06:07 2009
@@ -21,6 +21,7 @@
#include <decaf/util/concurrent/Concurrent.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/lang/Thread.h>
+#include <decaf/lang/Pointer.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/mock/MockTransport.h>
@@ -30,8 +31,10 @@
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::transport;
using namespace decaf;
using namespace decaf::util;
+using namespace decaf::lang;
////////////////////////////////////////////////////////////////////////////////
//void ActiveMQConnectionTest::test1WithStomp()
@@ -239,9 +242,8 @@
MyCommandListener cmdListener;
MyDispatcher msgListener;
std::string connectionId = "testConnectionId";
- decaf::util::Properties* properties =
- new decaf::util::Properties();
- transport::Transport* transport = NULL;
+ Pointer<decaf::util::Properties> properties( new decaf::util::Properties() );
+ Pointer<Transport> transport;
// Default to Stomp
properties->setProperty( "wireFormat", "openwire" );
Modified: activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp Thu Feb 19 01:06:07 2009
@@ -553,7 +553,7 @@
// Send the Message
CPPUNIT_ASSERT( dTransport != NULL );
- MessageDispatch* dispatch = new MessageDispatch();
+ Pointer<MessageDispatch> dispatch( new MessageDispatch() );
dispatch->setMessage( msg );
dispatch->setConsumerId( Pointer<ConsumerId>( id.cloneDataStructure() ) );
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp Thu Feb 19 01:06:07 2009
@@ -72,10 +72,14 @@
virtual bool hasNegotiator() const { return false; }
- virtual wireformat::WireFormatNegotiator* createNegotiator( transport::Transport* transport )
- throw( decaf::lang::exceptions::UnsupportedOperationException ) { return NULL; }
+ virtual Pointer<Transport> createNegotiator(
+ const Pointer<transport::Transport>& transport )
+ throw( decaf::lang::exceptions::UnsupportedOperationException ) {
- virtual commands::Command* unmarshal( decaf::io::DataInputStream* inputStream )
+ return Pointer<wireformat::WireFormatNegotiator>();
+ }
+
+ virtual Pointer<commands::Command> unmarshal( decaf::io::DataInputStream* inputStream )
throw ( CommandIOException ){
try{
@@ -87,7 +91,7 @@
synchronized( inputStream ){
- MyCommand* command = new MyCommand();
+ Pointer<MyCommand> command( new MyCommand() );
try{
// Throw a little uncertainty into the test.
@@ -97,16 +101,9 @@
command->c = inputStream->readByte();
} catch( decaf::lang::Exception& ex ){
-
- // Free the memory.
- delete command;
-
ex.setMark( __FILE__, __LINE__ );
throw CommandIOException();
} catch( ... ) {
- // Free the memory.
- delete command;
-
throw CommandIOException( __FILE__, __LINE__, "Catch all" );
}
@@ -114,7 +111,7 @@
}
CPPUNIT_ASSERT( false );
- return NULL;
+ return Pointer<Command>();
}catch( decaf::lang::Exception& ex ){
CommandIOException cx;
@@ -128,7 +125,8 @@
}
}
- virtual void marshal( commands::Command* command, decaf::io::DataOutputStream* outputStream )
+ virtual void marshal( const Pointer<commands::Command>& command,
+ decaf::io::DataOutputStream* outputStream )
throw (CommandIOException)
{
try{
@@ -136,7 +134,7 @@
synchronized( outputStream ){
const MyCommand* m =
- dynamic_cast<const MyCommand*>(command);
+ dynamic_cast<const MyCommand*>(command.get());
outputStream->write( m->c );
}
@@ -170,10 +168,9 @@
}
std::string str;
- virtual void onCommand( commands::Command* command ){
- const MyCommand* cmd = dynamic_cast<const MyCommand*>(command);
+ virtual void onCommand( const Pointer<commands::Command>& command ){
+ const MyCommand* cmd = dynamic_cast<const MyCommand*>(command.get());
str += cmd->c;
- delete command;
latch.countDown();
}
@@ -208,8 +205,8 @@
decaf::io::DataInputStream input( &is );
decaf::io::DataOutputStream output( &os );
MyTransportListener listener;
- MyWireFormat wireFormat;
- IOTransport transport( &wireFormat );
+ Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
+ IOTransport transport( wireFormat );
transport.setTransportListener( &listener );
transport.setInputStream( &input );
transport.setOutputStream( &output );
@@ -230,11 +227,11 @@
decaf::io::DataOutputStream output( &bos );
for( int i = 0; i < 50; ++i ) {
- MyWireFormat wireFormat;
+ Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
MyTransportListener listener;
IOTransport transport;
- transport.setWireFormat( &wireFormat );
+ transport.setWireFormat( wireFormat );
transport.setTransportListener( &listener );
transport.setInputStream( &input );
transport.setOutputStream( &output );
@@ -261,13 +258,13 @@
decaf::io::DataInputStream input( &is );
decaf::io::DataOutputStream output( &os );
- MyWireFormat wireFormat;
+ Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
MyTransportListener listener(10);
IOTransport transport;
transport.setInputStream( &input );
transport.setOutputStream( &output );
transport.setTransportListener( &listener );
- transport.setWireFormat( &wireFormat );
+ transport.setWireFormat( wireFormat );
transport.start();
@@ -297,27 +294,27 @@
decaf::io::DataInputStream input( &is );
decaf::io::DataOutputStream output( &os );
- MyWireFormat wireFormat;
+ Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
MyTransportListener listener;
IOTransport transport;
transport.setInputStream( &input );
transport.setOutputStream( &output );
transport.setTransportListener( &listener );
- transport.setWireFormat( &wireFormat );
+ transport.setWireFormat( wireFormat );
transport.start();
- MyCommand cmd;
- cmd.c = '1';
- transport.oneway( &cmd );
- cmd.c = '2';
- transport.oneway( &cmd );
- cmd.c = '3';
- transport.oneway( &cmd );
- cmd.c = '4';
- transport.oneway( &cmd );
- cmd.c = '5';
- transport.oneway( &cmd );
+ Pointer<MyCommand> cmd( new MyCommand() );
+ cmd->c = '1';
+ transport.oneway( cmd );
+ cmd->c = '2';
+ transport.oneway( cmd );
+ cmd->c = '3';
+ transport.oneway( cmd );
+ cmd->c = '4';
+ transport.oneway( cmd );
+ cmd->c = '5';
+ transport.oneway( cmd );
const unsigned char* bytes = os.toByteArray();
std::size_t size = os.size();
@@ -339,14 +336,14 @@
decaf::io::DataInputStream input( &is );
decaf::io::DataOutputStream output( &os );
- MyWireFormat wireFormat;
+ Pointer<MyWireFormat> wireFormat( new MyWireFormat() );
MyTransportListener listener;
IOTransport transport;
- wireFormat.throwException = true;
+ wireFormat->throwException = true;
transport.setInputStream( &input );
transport.setOutputStream( &output );
transport.setTransportListener( &listener );
- transport.setWireFormat( &wireFormat );
+ transport.setWireFormat( wireFormat );
unsigned char buffer[1] = { '1' };
try{
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Thu Feb 19 01:06:07 2009
@@ -17,34 +17,308 @@
#include "ResponseCorrelatorTest.h"
+#include <activemq/util/Config.h>
+#include <activemq/commands/BaseCommand.h>
+#include <activemq/transport/DefaultTransportListener.h>
+#include <activemq/transport/correlator/ResponseCorrelator.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <queue>
+
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::transport::correlator;
////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelatorTest::testBasics(){
+namespace activemq{
+namespace transport{
+namespace correlator{
+
+ class MyCommand : public commands::BaseCommand{
+ private:
+
+ unsigned int commandId;
+
+ public:
+
+ virtual std::string toString() const{ return ""; }
+
+ virtual unsigned char getDataStructureType() const { return 1; }
+
+ virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
+ throw( exceptions::ActiveMQException ) { return decaf::lang::Pointer<commands::Command>(); }
+
+ virtual MyCommand* cloneDataStructure() const{
+ MyCommand* command = new MyCommand;
+ command->setCommandId( this->getCommandId() );
+ command->setResponseRequired( this->isResponseRequired() );
+ return command;
+ }
+ };
+
+ class MyTransport : public Transport, public decaf::lang::Runnable{
+ public:
+
+ TransportListener* listener;
+ decaf::lang::Thread* thread;
+ decaf::util::concurrent::Mutex mutex;
+ decaf::util::concurrent::Mutex startedMutex;
+ bool done;
+ std::queue< Pointer<commands::Command> > requests;
+
+ public:
+
+ MyTransport(){
+ listener = NULL;
+ thread = NULL;
+ done = false;
+ }
+
+ virtual ~MyTransport(){
+
+ close();
+ }
+
+ virtual void oneway( const Pointer<Command>& command )
+ throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
+ {
+ synchronized( &mutex ){
+ requests.push( command );
+ mutex.notifyAll();
+ }
+ }
+
+ virtual Pointer<Response> request( const Pointer<Command>& command AMQCPP_UNUSED )
+ throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
+ {
+ throw decaf::lang::exceptions::UnsupportedOperationException(
+ __FILE__, __LINE__, "stuff" );
+ }
+
+ virtual Pointer<Response> request( const Pointer<Command>& command AMQCPP_UNUSED,
+ unsigned int timeout AMQCPP_UNUSED )
+ throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
+ {
+ throw decaf::lang::exceptions::UnsupportedOperationException(
+ __FILE__, __LINE__, "stuff" );
+ }
+
+ virtual void setWireFormat(
+ const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}
+
+ virtual void setTransportListener( TransportListener* listener ) {
+ this->listener = listener;
+ }
+
+ virtual void start() throw( cms::CMSException ){
+ close();
+
+ done = false;
+ thread = new decaf::lang::Thread( this );
+ thread->start();
+ }
+
+ virtual void close() throw( cms::CMSException ){
+
+ done = true;
+
+ if( thread != NULL ){
+ synchronized( &mutex ){
+ mutex.notifyAll();
+ }
+ thread->join();
+ delete thread;
+ thread = NULL;
+ }
+ }
+
+ virtual Pointer<Response> createResponse( const Pointer<Command>& command ){
+
+ Pointer<Response> resp( new commands::Response() );
+ resp->setCorrelationId( command->getCommandId() );
+ resp->setResponseRequired( false );
+ return resp;
+ }
+
+ virtual void run(){
+
+ try{
+
+ synchronized(&startedMutex)
+ {
+ startedMutex.notifyAll();
+ }
+
+ synchronized( &mutex ){
+
+ while( !done ){
+
+ if( requests.empty() ){
+ mutex.wait();
+ }else{
+
+ Pointer<Command> cmd = requests.front();
+ requests.pop();
+
+ // Only send a response if one is required.
+ Pointer<Response> resp;
+ if( cmd->isResponseRequired() ){
+ resp = createResponse( cmd );
+ }
+
+ mutex.unlock();
+
+ // Send both the response and the original
+ // command back to the correlator.
+ if( listener != NULL ){
+ if( resp != NULL ){
+ listener->onCommand( resp );
+ }
+ listener->onCommand( cmd );
+ }
+
+ mutex.lock();
+ }
+ }
+ }
+ }catch( exceptions::ActiveMQException& ex ){
+ if( listener ){
+ listener->onTransportException( this, ex );
+ }
+ }
+ catch( ... ){
+ if( listener ){
+ exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
+ listener->onTransportException( this, ex );
+ }
+ }
+ }
+
+ virtual Transport* narrow( const std::type_info& typeId ) {
+ if( typeid( *this ) == typeId ) {
+ return this;
+ }
+
+ return NULL;
+ }
+
+ virtual bool isFaultTolerant() const {
+ return true;
+ }
+
+ virtual bool isConnected() const {
+ return false;
+ }
+
+ virtual bool isClosed() const {
+ return false;
+ }
+
+ };
+
+ class MyBrokenTransport : public MyTransport{
+ public:
+
+ MyBrokenTransport(){}
+ virtual ~MyBrokenTransport(){}
+
+ virtual commands::Response* createResponse(commands:: Command* command AMQCPP_UNUSED){
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "bad stuff" );
+ }
+ };
+
+ class MyListener : public DefaultTransportListener {
+ public:
+
+ int exCount;
+ std::set<int> commands;
+ decaf::util::concurrent::Mutex mutex;
+
+ public:
+
+ MyListener(){
+ exCount = 0;
+ }
+ virtual ~MyListener(){}
+ virtual void onCommand( commands::Command* command ){
+
+ synchronized( &mutex ){
+ commands.insert( command->getCommandId() );
+
+ mutex.notify();
+ }
+ }
+
+ virtual void onTransportException(
+ Transport* source AMQCPP_UNUSED,
+ const decaf::lang::Exception& ex AMQCPP_UNUSED)
+ {
+ synchronized( &mutex ){
+ exCount++;
+ }
+ }
+
+ };
+
+ class RequestThread : public decaf::lang::Thread{
+ public:
+
+ Transport* transport;
+ Pointer<MyCommand> cmd;
+ Pointer<Response> resp;
+
+ public:
+
+ RequestThread(){
+ transport = NULL;
+ cmd.reset( new MyCommand() );
+ }
+
+ virtual ~RequestThread(){
+ join();
+ }
+
+ void setTransport( Transport* transport ){
+ this->transport = transport;
+ }
+
+ void run(){
+
+ try{
+ resp = transport->request(cmd);
+ }catch( ... ){
+ CPPUNIT_ASSERT( false );
+ }
+ }
+ };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelatorTest::testBasics() {
try{
MyListener listener;
- MyTransport transport;
- ResponseCorrelator correlator( &transport, false );
+ Pointer<MyTransport> transport( new MyTransport() );
+ ResponseCorrelator correlator( transport );
correlator.setTransportListener( &listener );
- CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport->listener == &correlator );
// Give the thread a little time to get up and running.
- synchronized(&transport.startedMutex)
- {
+ synchronized( &(transport->startedMutex) ) {
// Start the transport.
correlator.start();
- transport.startedMutex.wait();
+ transport->startedMutex.wait();
}
// Send one request.
- MyCommand cmd;
- commands::Response* resp = correlator.request( &cmd );
+ Pointer<MyCommand> cmd( new MyCommand );
+ Pointer<Response> resp = correlator.request( cmd );
CPPUNIT_ASSERT( resp != NULL );
- CPPUNIT_ASSERT( resp->getCorrelationId() == cmd.getCommandId() );
+ CPPUNIT_ASSERT( resp->getCorrelationId() == cmd->getCommandId() );
// Wait to get the message back asynchronously.
decaf::lang::Thread::sleep( 100 );
@@ -56,9 +330,6 @@
CPPUNIT_ASSERT( listener.exCount == 0 );
correlator.close();
-
- // Destroy the response.
- delete resp;
}
AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
@@ -70,25 +341,25 @@
try{
MyListener listener;
- MyTransport transport;
- ResponseCorrelator correlator( &transport, false );
+ Pointer<MyTransport> transport( new MyTransport() );
+ ResponseCorrelator correlator( transport );
correlator.setTransportListener( &listener );
- CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport->listener == &correlator );
// Give the thread a little time to get up and running.
- synchronized(&transport.startedMutex)
- {
+ synchronized( &(transport->startedMutex) ) {
+
// Start the transport.
correlator.start();
- transport.startedMutex.wait();
+ transport->startedMutex.wait();
}
// Send many oneway request (we'll get them back asynchronously).
const unsigned int numCommands = 1000;
- MyCommand commands[numCommands];
- for( unsigned int ix=0; ix<numCommands; ix++ ){
- correlator.oneway( &commands[ix] );
+ for( unsigned int ix = 0; ix < numCommands; ++ix ) {
+ Pointer<MyCommand> command( new MyCommand() );
+ correlator.oneway( command );
}
// Give the thread a little time to get all the messages back.
@@ -110,24 +381,23 @@
try{
MyListener listener;
- MyBrokenTransport transport;
- ResponseCorrelator correlator( &transport, false );
+ Pointer<MyBrokenTransport> transport( new MyBrokenTransport() );
+ ResponseCorrelator correlator( transport );
correlator.setTransportListener( &listener );
- CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport->listener == &correlator );
// Give the thread a little time to get up and running.
- synchronized(&transport.startedMutex)
- {
+ synchronized( &(transport->startedMutex) ) {
// Start the transport.
correlator.start();
- transport.startedMutex.wait();
+ transport->startedMutex.wait();
}
// Send one request.
- MyCommand cmd;
+ Pointer<MyCommand> cmd( new MyCommand );
try{
- correlator.request( &cmd, 500 );
+ correlator.request( cmd, 500 );
CPPUNIT_ASSERT(false);
}catch( CommandIOException& ex ){
// Expected.
@@ -154,21 +424,20 @@
try{
MyListener listener;
- MyTransport transport;
- ResponseCorrelator correlator( &transport, false );
+ Pointer<MyTransport> transport( new MyTransport() );
+ ResponseCorrelator correlator( transport );
correlator.setTransportListener( &listener );
- CPPUNIT_ASSERT( transport.listener == &correlator );
+ CPPUNIT_ASSERT( transport->listener == &correlator );
// Start the transport.
correlator.start();
// Make sure the start command got down to the thread.
- CPPUNIT_ASSERT( transport.thread != NULL );
+ CPPUNIT_ASSERT( transport->thread != NULL );
// Give the thread a little time to get up and running.
- synchronized(&transport.startedMutex)
- {
- transport.startedMutex.wait(500);
+ synchronized( &(transport->startedMutex) ) {
+ transport->startedMutex.wait(500);
}
// Start all the requester threads.
@@ -184,7 +453,8 @@
for( unsigned int ix=0; ix<numRequests; ++ix ){
requesters[ix].join();
CPPUNIT_ASSERT( requesters[ix].resp != NULL );
- CPPUNIT_ASSERT( requesters[ix].cmd.getCommandId() == requesters[ix].resp->getCorrelationId() );
+ CPPUNIT_ASSERT( requesters[ix].cmd->getCommandId() ==
+ requesters[ix].resp->getCorrelationId() );
}
decaf::lang::Thread::sleep( 60 );
@@ -219,17 +489,17 @@
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelatorTest::testNarrow(){
- MyTransport transport;
- ResponseCorrelator correlator( &transport, false );
+ Pointer<MyTransport> transport( new MyTransport() );
+ ResponseCorrelator correlator( transport );
Transport* narrowed = correlator.narrow( typeid( transport ) );
- CPPUNIT_ASSERT( narrowed == &transport );
+ CPPUNIT_ASSERT( narrowed == transport );
narrowed = correlator.narrow( typeid( std::string() ) );
CPPUNIT_ASSERT( narrowed == NULL );
narrowed = correlator.narrow( typeid( MyTransport ) );
- CPPUNIT_ASSERT( narrowed == &transport );
+ CPPUNIT_ASSERT( narrowed == transport );
narrowed = correlator.narrow( typeid( transport::correlator::ResponseCorrelator ) );
CPPUNIT_ASSERT( narrowed == &correlator );
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h Thu Feb 19 01:06:07 2009
@@ -21,15 +21,6 @@
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
-#include <activemq/util/Config.h>
-#include <activemq/commands/BaseCommand.h>
-#include <activemq/transport/DefaultTransportListener.h>
-#include <activemq/transport/correlator/ResponseCorrelator.h>
-#include <decaf/lang/Thread.h>
-#include <decaf/util/concurrent/Concurrent.h>
-#include <decaf/lang/exceptions/UnsupportedOperationException.h>
-#include <queue>
-
namespace activemq{
namespace transport{
namespace correlator{
@@ -46,294 +37,6 @@
public:
- class MyCommand : public commands::BaseCommand{
- private:
-
- unsigned int commandId;
-
- public:
-
- virtual std::string toString() const{ return ""; }
-
- virtual unsigned char getDataStructureType() const { return 1; }
-
- virtual decaf::lang::Pointer<commands::Command> visit( activemq::state::CommandVisitor* visitor )
- throw( exceptions::ActiveMQException ) { return decaf::lang::Pointer<commands::Command>(); }
-
- virtual MyCommand* cloneDataStructure() const{
- MyCommand* command = new MyCommand;
- command->setCommandId( this->getCommandId() );
- command->setResponseRequired( this->isResponseRequired() );
- return command;
- }
- };
-
- class MyTransport
- :
- public Transport,
- public decaf::lang::Runnable{
- public:
- TransportListener* listener;
- decaf::lang::Thread* thread;
- decaf::util::concurrent::Mutex mutex;
- decaf::util::concurrent::Mutex startedMutex;
- bool done;
- std::queue<commands::Command*> requests;
-
- public:
-
- MyTransport(){
- listener = NULL;
- thread = NULL;
- done = false;
- }
-
- virtual ~MyTransport(){
-
- close();
- }
-
- virtual void oneway( commands::Command* command )
- throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
- {
- synchronized( &mutex ){
- requests.push( command );
- mutex.notifyAll();
- }
- }
-
- virtual commands::Response* request( commands::Command* command AMQCPP_UNUSED)
- throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
- {
- throw decaf::lang::exceptions::UnsupportedOperationException(
- __FILE__,
- __LINE__,
- "stuff" );
- }
-
- virtual commands::Response* request( commands::Command* command AMQCPP_UNUSED,
- unsigned int timeout AMQCPP_UNUSED )
- throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
- {
- throw decaf::lang::exceptions::UnsupportedOperationException(
- __FILE__,
- __LINE__,
- "stuff" );
- }
-
- virtual void setWireFormat( wireformat::WireFormat* wireFormat ) {
-
- }
-
- virtual void setTransportListener( TransportListener* listener ) {
- this->listener = listener;
- }
-
- virtual void start() throw( cms::CMSException ){
- close();
-
- done = false;
-
- thread = new decaf::lang::Thread( this );
- thread->start();
- }
-
- virtual void close() throw( cms::CMSException ){
-
- done = true;
-
- if( thread != NULL ){
- synchronized( &mutex ){
- mutex.notifyAll();
- }
- thread->join();
- delete thread;
- thread = NULL;
- }
- }
-
- virtual commands::Response* createResponse( commands::Command* command ){
-
- commands::Response* resp = new commands::Response();
- resp->setCorrelationId( command->getCommandId() );
- resp->setResponseRequired( false );
- return resp;
- }
-
- virtual void run(){
-
- try{
-
- synchronized(&startedMutex)
- {
- startedMutex.notifyAll();
- }
-
- synchronized( &mutex ){
-
- while( !done ){
-
- if( requests.empty() ){
- mutex.wait();
- }else{
-
- commands::Command* cmd = requests.front();
- requests.pop();
-
- // Only send a response if one is required.
- commands::Response* resp = NULL;
- if( cmd->isResponseRequired() ){
- resp = createResponse( cmd );
- }
-
- mutex.unlock();
-
- // Send both the response and the original
- // command back to the correlator.
- if( listener != NULL ){
- if( resp != NULL ){
- listener->onCommand( resp );
- }
- listener->onCommand( cmd );
- }
-
- mutex.lock();
- }
- }
- }
- }catch( exceptions::ActiveMQException& ex ){
- if( listener ){
- listener->onTransportException( this, ex );
- }
- }
- catch( ... ){
- if( listener ){
- exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
- listener->onTransportException( this, ex );
- }
- }
- }
-
- virtual Transport* narrow( const std::type_info& typeId ) {
- if( typeid( *this ) == typeId ) {
- return this;
- }
-
- return NULL;
- }
-
- /**
- * Is this Transport fault tolerant, meaning that it will reconnect to
- * a broker on disconnect.
- *
- * @returns true if the Transport is fault tolerant.
- */
- virtual bool isFaultTolerant() const {
- return true;
- }
-
- /**
- * Is the Transport Connected to its Broker.
- *
- * @returns true if a connection has been made.
- */
- virtual bool isConnected() const {
- return false;
- }
-
- /**
- * Has the Transport been shutdown and no longer usable.
- *
- * @returns true if the Transport
- */
- virtual bool isClosed() const {
- return false;
- }
-
- };
-
- class MyBrokenTransport : public MyTransport{
- public:
-
- MyBrokenTransport(){}
- virtual ~MyBrokenTransport(){}
-
- virtual commands::Response* createResponse(commands:: Command* command AMQCPP_UNUSED){
- throw exceptions::ActiveMQException( __FILE__, __LINE__,
- "bad stuff" );
- }
- };
-
- class MyListener : public DefaultTransportListener {
- public:
-
- int exCount;
- std::set<int> commands;
- decaf::util::concurrent::Mutex mutex;
-
- public:
-
- MyListener(){
- exCount = 0;
- }
- virtual ~MyListener(){}
- virtual void onCommand( commands::Command* command ){
-
- synchronized( &mutex ){
- commands.insert( command->getCommandId() );
-
- mutex.notify();
- }
- }
-
- virtual void onTransportException(
- Transport* source AMQCPP_UNUSED,
- const decaf::lang::Exception& ex AMQCPP_UNUSED)
- {
- synchronized( &mutex ){
- exCount++;
- }
- }
-
- };
-
- class RequestThread : public decaf::lang::Thread{
- public:
-
- Transport* transport;
- MyCommand cmd;
- commands::Response* resp;
- public:
-
- RequestThread(){
- transport = NULL;
- resp = NULL;
- }
- virtual ~RequestThread(){
- join();
-
- if( resp != NULL ){
- delete resp;
- resp = NULL;
- }
- }
-
- void setTransport( Transport* transport ){
- this->transport = transport;
- }
-
- void run(){
-
- try{
- resp = transport->request(&cmd);
- }catch( ... ){
- CPPUNIT_ASSERT( false );
- }
- }
- };
-
- public:
-
virtual ~ResponseCorrelatorTest(){}
void testBasics();
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp Thu Feb 19 01:06:07 2009
@@ -38,11 +38,11 @@
MockTransportFactory factory;
- auto_ptr<Transport> transport( factory.create( uri ) );
+ Pointer<Transport> transport( factory.create( uri ) );
CPPUNIT_ASSERT( transport.get() != NULL );
- transport.reset( factory.createComposite( uri ) );
+ transport = factory.createComposite( uri );
CPPUNIT_ASSERT( transport.get() != NULL );