You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/04/12 22:29:43 UTC
svn commit: r528222 [2/5] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: examples/ main/
main/activemq/connector/openwire/
main/activemq/connector/openwire/commands/ main/activemq/connector/stomp/
main/activemq/connector/stomp/commands/ main/activ...
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h Thu Apr 12 13:29:39 2007
@@ -26,7 +26,7 @@
namespace connector{
namespace stomp{
namespace commands{
-
+
/**
* Command sent to the broker to unsubscribe to a
* topic or queue.
@@ -34,40 +34,50 @@
class UnsubscribeCommand : public AbstractCommand< transport::Command >
{
public:
-
- UnsubscribeCommand(void) :
+
+ UnsubscribeCommand() :
AbstractCommand< transport::Command >() {
initialize( getFrame() );
}
- UnsubscribeCommand( StompFrame* frame ) :
+
+ UnsubscribeCommand( StompFrame* frame ) :
AbstractCommand< transport::Command >( frame ) {
validate( getFrame() );
}
+
virtual ~UnsubscribeCommand(void) {};
-
+
+ /**
+ * Clone the StompCommand and return the new copy.
+ * @returns new copy of this command caller owns it.
+ */
+ virtual StompCommand* cloneStompCommand() const {
+ return new UnsubscribeCommand( getFrame().clone() );
+ }
+
/**
* Get the destination
* @returns the Destination as a string
- */
- virtual std::string getDestination(void) const{
- return getPropertyValue(
- CommandConstants::toString(
+ */
+ virtual std::string getDestination() const{
+ return getPropertyValue(
+ CommandConstants::toString(
CommandConstants::HEADER_DESTINATION ), "" );
}
-
+
/**
* Set the destination
* @param destination the destiantion as a String
*/
virtual void setDestination( const std::string& destination ){
- setPropertyValue(
- CommandConstants::toString(
+ setPropertyValue(
+ CommandConstants::toString(
CommandConstants::HEADER_DESTINATION ),
destination );
}
-
+
protected:
-
+
/**
* Inheritors are required to override this method to init the
* frame with data appropriate for the command type.
@@ -80,17 +90,17 @@
}
/**
- * Inheritors are required to override this method to validate
+ * Inheritors are required to override this method to validate
* the passed stomp frame before it is marshalled or unmarshaled
* @param frame Frame to validate
* @returns true if frame is valid
*/
virtual bool validate( const StompFrame& frame ) const
{
- if((frame.getCommand() ==
+ if((frame.getCommand() ==
CommandConstants::toString( CommandConstants::UNSUBSCRIBE )) &&
(frame.getProperties().hasProperty(
- CommandConstants::toString(
+ CommandConstants::toString(
CommandConstants::HEADER_DESTINATION ) ) ) )
{
return true;
@@ -100,7 +110,7 @@
}
};
-
+
}}}}
#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_UNSUBSCRIBECOMMAND_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Thu Apr 12 13:29:39 2007
@@ -18,15 +18,11 @@
#include <activemq/util/Guid.h>
#include <activemq/util/SimpleProperties.h>
-#include <activemq/util/StringTokenizer.h>
#include <activemq/connector/ConnectorFactoryMap.h>
-#include <activemq/network/SocketFactory.h>
-#include <activemq/transport/TransportFactoryMap.h>
-#include <activemq/network/Socket.h>
+#include <activemq/transport/TransportBuilder.h>
#include <activemq/exceptions/NullPointerException.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConstants.h>
-#include <activemq/util/StringTokenizer.h>
#include <activemq/support/LibraryInit.h>
using namespace std;
@@ -35,7 +31,6 @@
using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::exceptions;
-using namespace activemq::network;
using namespace activemq::transport;
////////////////////////////////////////////////////////////////////////////////
@@ -100,6 +95,7 @@
ActiveMQConnectionData* connectionData = NULL;
ActiveMQConnection* connection = NULL;
std::string clientIdLocal = clientId;
+ TransportBuilder transportBuilder;
try
{
@@ -122,23 +118,9 @@
ActiveMQConstants::toString(
ActiveMQConstants::PARAM_CLIENTID ), clientIdLocal );
- // Parse out the properties from the URI
- parseURL( url, *properties );
+ // Use the TransportBuilder to get our Transport
+ transport = transportBuilder.buildTransport( url, *properties );
- // Create the Transport that the Connector will use.
- string factoryName =
- properties->getProperty( "transport", "tcp" );
- TransportFactory* factory =
- TransportFactoryMap::getInstance().lookup( factoryName );
- if( factory == NULL ){
- throw ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQConnectionFactory::createConnection - "
- "unknown transport factory" );
- }
-
- // Create the transport.
- transport = factory->createTransport( *properties );
if( transport == NULL ){
throw ActiveMQException(
__FILE__, __LINE__,
@@ -210,63 +192,4 @@
throw ex;
}
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnectionFactory::parseURL( const std::string& URI,
- Properties& properties )
- throw ( exceptions::IllegalArgumentException )
-{
- try
- {
- StringTokenizer tokenizer( URI, ":/" );
-
- std::vector<std::string> tokens;
-
- // Require that there be three tokens at the least, these are
- // transport, url, port.
- if( tokenizer.countTokens() < 3 )
- {
- throw exceptions::IllegalArgumentException(
- __FILE__, __LINE__,
- (string( "ActiveMQConnectionFactory::parseURL - "
- "Marlformed URI: ") + URI).c_str() );
- }
-
- // First element should be the Transport Type, following that is the
- // URL and any params.
- properties.setProperty( "transport", tokenizer.nextToken() );
-
- // Parse URL and Port as one item, optional params follow the ?
- // and then each param set is delimited with & we extract first
- // three chars as they are the left over ://
- properties.setProperty( "uri", tokenizer.nextToken("&?").substr(3) );
-
- // Now get all the optional parameters and store them as properties
- int count = tokenizer.toArray( tokens );
-
- for( int i = 0; i < count; ++i )
- {
- tokenizer.reset( tokens[i], "=" );
-
- if( tokenizer.countTokens() != 2 )
- {
- throw exceptions::IllegalArgumentException(
- __FILE__, __LINE__,
- ( string( "ActiveMQConnectionFactory::parseURL - "
- "Marlformed Parameter = " ) + tokens[i] ).c_str() );
- }
-
- // Get them in order, passing both as nextToken calls in the
- // set Property can cause reversed order.
- string key = tokenizer.nextToken();
- string value = tokenizer.nextToken();
-
- // Store this param as a property
- properties.setProperty( key, value );
- }
- }
- AMQ_CATCH_RETHROW( IllegalArgumentException )
- AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IllegalArgumentException )
- AMQ_CATCHALL_THROW( IllegalArgumentException )
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Thu Apr 12 13:29:39 2007
@@ -20,8 +20,6 @@
#include <cms/ConnectionFactory.h>
#include <cms/Connection.h>
-#include <activemq/exceptions/IllegalArgumentException.h>
-
namespace activemq{
namespace core{
@@ -43,7 +41,7 @@
public:
- ActiveMQConnectionFactory();
+ ActiveMQConnectionFactory();
/**
* Constructor
@@ -55,7 +53,7 @@
const std::string& username = "",
const std::string& password = "" );
- virtual ~ActiveMQConnectionFactory() {}
+ virtual ~ActiveMQConnectionFactory() {}
/**
* Creates a connection with the default user identity. The
@@ -174,19 +172,6 @@
const std::string& password,
const std::string& clientId = "" )
throw ( cms::CMSException );
-
- protected:
-
- /**
- * Parses the properties out of the provided Broker URI and sets
- * them in the passed Properties Object.
- * @param URI a Broker URI to parse
- * @param properties a Properties object to set the parsed values in
- * @throws IllegalArgumentException if the passed URI is invalid
- */
- static void parseURL( const std::string& URI,
- util::Properties& properties )
- throw ( exceptions::IllegalArgumentException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Apr 12 13:29:39 2007
@@ -36,7 +36,6 @@
using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::exceptions;
-using namespace activemq::concurrent;
////////////////////////////////////////////////////////////////////////////////
ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
@@ -52,17 +51,8 @@
this->sessionInfo = sessionInfo;
this->transaction = NULL;
- this->connection = connection;
+ this->connection = connection;
this->closed = false;
- this->asyncThread = NULL;
- this->useAsyncSend = Boolean::parseBoolean(
- properties.getProperty( "useAsyncSend", "false" ) );
-
- // If we are in Async Send Mode we need to start the Thread
- // otherwise we don't need to do anything.
- if( this->useAsyncSend == true ) {
- this->startThread();
- }
// Create a Transaction object only if the session is transactional
if( isTransacted() )
@@ -137,12 +127,6 @@
// Now indicate that this session is closed.
closed = true;
- // Stop the Async Thread if its running
- stopThread();
-
- // Remove any unsent cloned messages.
- purgeMessages();
-
delete executor;
executor = NULL;
}
@@ -684,21 +668,9 @@
"ActiveMQSession::onProducerClose - Session Already Closed" );
}
- if( useAsyncSend ) {
-
- // Put it in the send queue, thread will dispatch it. We clone it
- // in case the client deletes their copy before we get a chance to
- // send it.
- synchronized( &msgQueue ) {
- msgQueue.push( make_pair( message->clone(), producer ) );
- msgQueue.notifyAll();
- }
-
- } else {
- // Send via the connection syncrhronously.
- connection->getConnectionData()->
- getConnector()->send( message, producer->getProducerInfo() );
- }
+ // Send via the connection syncrhronously.
+ connection->getConnectionData()->
+ getConnector()->send( message, producer->getProducerInfo() );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -737,12 +709,12 @@
transaction->removeFromTransaction(
consumer->getConsumerId() );
}
-
+
ActiveMQConsumer* obj = NULL;
synchronized( &consumers ) {
-
+
if( consumers.containsKey( consumer->getConsumerId() ) ) {
-
+
// Get the consumer reference
obj = consumers.getValue( consumer->getConsumerId() );
@@ -750,14 +722,14 @@
consumers.remove( consumer->getConsumerId() );
}
}
-
+
// Clean up any resources in the executor for this consumer
- if( obj != NULL && executor != NULL ) {
-
+ if( obj != NULL && executor != NULL ) {
+
// Purge any pending messages for this consumer.
- vector<ActiveMQMessage*> messages =
+ vector<ActiveMQMessage*> messages =
executor->purgeConsumerMessages(obj);
-
+
// Destroy the messages.
for( unsigned int ix=0; ix<messages.size(); ++ix ) {
delete messages[ix];
@@ -795,125 +767,6 @@
}
return NULL;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::run()
-{
- try{
-
- while( !closed )
- {
- std::pair<Message*, ActiveMQProducer*> messagePair;
-
- synchronized( &msgQueue )
- {
- // Gaurd against spurious wakeup or race to sync lock
- // also if the listner has been unregistered we don't
- // have anyone to notify, so we wait till a new one is
- // registered, and then we will deliver the backlog
- while( msgQueue.empty() )
- {
- if( closed )
- {
- break;
- }
- msgQueue.wait();
- }
-
- // don't want to process messages if we are shutting down.
- if( closed )
- {
- return;
- }
-
- // get the data
- messagePair = msgQueue.pop();
- }
-
- // Dispatch the message
- connection->getConnectionData()->
- getConnector()->send(
- messagePair.first,
- messagePair.second->getProducerInfo() );
-
- // Destroy Our copy of the message
- delete messagePair.first;
- }
- }
- catch(...)
- {
- cms::ExceptionListener* listener = this->getExceptionListener();
-
- if( listener != NULL )
- {
- listener->onException( ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQSession::run - "
- "Connector threw an unknown Exception, recovering..." ) );
- }
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::startThread() throw ( ActiveMQException ) {
-
- try
- {
- // Start the thread, if it's not already started.
- if( asyncThread == NULL )
- {
- asyncThread = new Thread( this );
- asyncThread->start();
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::stopThread() throw ( ActiveMQException ) {
-
- try
- {
- // if the thread is running signal it to quit and then
- // wait for run to return so thread can die
- if( asyncThread != NULL )
- {
- synchronized( &msgQueue )
- {
- // Force a wakeup if run is in a wait.
- msgQueue.notifyAll();
- }
-
- // Wait for it to die and then delete it.
- asyncThread->join();
- delete asyncThread;
- asyncThread = NULL;
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::purgeMessages() throw ( ActiveMQException )
-{
- try
- {
- synchronized( &msgQueue )
- {
- while( !msgQueue.empty() )
- {
- // destroy these messages if this is not a transacted
- // session, if it is then the tranasction will clean
- // the messages up.
- delete msgQueue.pop().first;
- }
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu Apr 12 13:29:39 2007
@@ -19,15 +19,12 @@
#include <cms/Session.h>
#include <cms/ExceptionListener.h>
-#include <activemq/concurrent/Runnable.h>
-#include <activemq/concurrent/Mutex.h>
#include <activemq/connector/SessionInfo.h>
+#include <activemq/connector/ConnectorResourceListener.h>
#include <activemq/core/Dispatcher.h>
+#include <activemq/util/Map.h>
#include <activemq/util/Set.h>
#include <activemq/util/Queue.h>
-#include <activemq/connector/ConnectorResourceListener.h>
-#include <activemq/util/Map.h>
-#include <set>
namespace activemq{
namespace core{
@@ -43,7 +40,6 @@
class ActiveMQSession :
public cms::Session,
public Dispatcher,
- public concurrent::Runnable,
public connector::ConnectorResourceListener
{
private:
@@ -74,27 +70,15 @@
* destination.
*/
util::Set<cms::Closeable*> closableSessionResources;
-
+
/**
* Map of consumers.
*/
util::Map<long long, ActiveMQConsumer*> consumers;
/**
- * Thread to notif a listener if one is added
+ * Sends incoming messages to the registered consumers.
*/
- concurrent::Thread* asyncThread;
-
- /**
- * Is this Session using Async Sends.
- */
- bool useAsyncSend;
-
- /**
- * Outgoing Message Queue
- */
- util::Queue< std::pair<cms::Message*, ActiveMQProducer*> > msgQueue;
-
ActiveMQSessionExecutor* executor;
public:
@@ -104,33 +88,33 @@
ActiveMQConnection* connection );
virtual ~ActiveMQSession();
-
+
util::Map<long long, ActiveMQConsumer*>& getConsumers() {
return consumers;
}
-
+
/**
* Redispatches the given set of unconsumed messages to the consumers.
* @param unconsumedMessages - unconsumed messages to be redelivered.
*/
void redispatch( util::Queue<DispatchData>& unconsumedMessages );
-
+
/**
* Stops asynchronous message delivery.
*/
void start();
-
+
/**
* Starts asynchronous message delivery.
*/
void stop();
-
+
/**
* Indicates whether or not the session is currently in the started
* state.
*/
bool isStarted() const;
-
+
bool isAutoAcknowledge() const {
return sessionInfo->getAckMode() == cms::Session::AUTO_ACKNOWLEDGE;
}
@@ -140,14 +124,14 @@
bool isClientAcknowledge() const {
return sessionInfo->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE;
}
-
+
/**
* Fires the given exception to the exception listener of the connection
*/
void fire( exceptions::ActiveMQException& ex );
-
+
public: // Methods from ActiveMQMessageDispatcher
-
+
/**
* Dispatches a message to a particular consumer.
* @param message - the message to be dispatched
@@ -327,21 +311,21 @@
* @return transacted true - false.
*/
virtual bool isTransacted() const;
-
+
/**
- * Unsubscribes a durable subscription that has been created by a
+ * Unsubscribes a durable subscription that has been created by a
* client.
- *
- * This method deletes the state being maintained on behalf of the
- * subscriber by its provider. It is erroneous for a client to delete a
- * durable subscription while there is an active MessageConsumer or
- * Subscriber for the subscription, or while a consumed message is
- * part of a pending transaction or has not been acknowledged in the
+ *
+ * This method deletes the state being maintained on behalf of the
+ * subscriber by its provider. It is erroneous for a client to delete a
+ * durable subscription while there is an active MessageConsumer or
+ * Subscriber for the subscription, or while a consumed message is
+ * part of a pending transaction or has not been acknowledged in the
* session.
* @param name the name used to identify this subscription
* @throws CMSException
*/
- virtual void unsubscribe( const std::string& name )
+ virtual void unsubscribe( const std::string& name )
throw ( cms::CMSException );
public: // ActiveMQSession specific Methods
@@ -395,33 +379,6 @@
const connector::ConnectorResource* resource ) throw ( cms::CMSException );
protected:
-
- /**
- * Run method that is called from the Thread class when this object
- * is registered with a Thread and started. This function reads from
- * the outgoing message queue and dispatches calls to the connector that
- * is registered with this class.
- */
- virtual void run();
-
- /**
- * Starts the message processing thread to receive messages
- * asynchronously. This thread is started when setMessageListener
- * is invoked, which means that the caller is choosing to use this
- * consumer asynchronously instead of synchronously (receive).
- */
- void startThread() throw ( exceptions::ActiveMQException );
-
- /**
- * Stops the asynchronous message processing thread if it's started.
- */
- void stopThread() throw ( exceptions::ActiveMQException );
-
- /**
- * Purges all messages currently in the queue. This can be as a
- * result of a rollback, or of the consumer being shutdown.
- */
- virtual void purgeMessages() throw ( exceptions::ActiveMQException );
/**
* Given a ConnectorResource pointer, this method will add it to the map
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp Thu Apr 12 13:29:39 2007
@@ -27,72 +27,74 @@
using namespace activemq::exceptions;
////////////////////////////////////////////////////////////////////////////////
-Socket* SocketFactory::createSocket(const Properties& properties)
- throw ( SocketException )
+Socket* SocketFactory::createSocket(
+ const std::string& uri,
+ const Properties& properties)
+ throw ( SocketException )
{
try
{
- const char* uri = properties.getProperty( "uri" );
- if( uri == NULL )
+ // Ensure something is actually passed in for the URI
+ if( uri == "" )
{
- throw SocketException( __FILE__, __LINE__,
+ throw SocketException( __FILE__, __LINE__,
"SocketTransport::start() - uri not provided" );
}
string dummy = uri;
-
+
// Extract the port.
std::size_t portIx = dummy.find( ':' );
if( portIx == string::npos )
{
- throw SocketException( __FILE__, __LINE__,
- "SocketTransport::start() - uri malformed - port not specified: %s", uri);
+ throw SocketException( __FILE__, __LINE__,
+ "SocketTransport::start() - uri malformed - port not specified: %s", uri.c_str() );
}
string host = dummy.substr( 0, portIx );
string portString = dummy.substr( portIx + 1 );
int port;
if( sscanf( portString.c_str(), "%d", &port) != 1 )
{
- throw SocketException( __FILE__, __LINE__,
- "SocketTransport::start() - unable to extract port from uri: %s", uri);
+ throw SocketException( __FILE__, __LINE__,
+ "SocketTransport::start() - unable to extract port from uri: %s", uri.c_str() );
}
-
+
// Get the read buffer size.
int inputBufferSize = 10000;
- dummy = properties.getProperty( "inputBufferSize", "10000" );
+ dummy = properties.getProperty( "inputBufferSize", "10000" );
sscanf( dummy.c_str(), "%d", &inputBufferSize );
-
+
// Get the write buffer size.
int outputBufferSize = 10000;
- dummy = properties.getProperty( "outputBufferSize", "10000" );
+ dummy = properties.getProperty( "outputBufferSize", "10000" );
sscanf( dummy.c_str(), "%d", &outputBufferSize );
-
+
// Get the linger flag.
int soLinger = 0;
- dummy = properties.getProperty( "soLinger", "0" );
- sscanf( dummy.c_str(), "%d", &soLinger );
-
+ dummy = properties.getProperty( "soLinger", "0" );
+ sscanf( dummy.c_str(), "%d", &soLinger );
+
// Get the keepAlive flag.
- bool soKeepAlive =
- properties.getProperty( "soKeepAlive", "false" ) == "true";
-
+ bool soKeepAlive =
+ properties.getProperty( "soKeepAlive", "false" ) == "true";
+
// Get the socket receive buffer size.
int soReceiveBufferSize = -1;
- dummy = properties.getProperty( "soReceiveBufferSize", "-1" );
+ dummy = properties.getProperty( "soReceiveBufferSize", "-1" );
sscanf( dummy.c_str(), "%d", &soReceiveBufferSize );
-
+
// Get the socket send buffer size.
int soSendBufferSize = -1;
- dummy = properties.getProperty( "soSendBufferSize", "-1" );
+ dummy = properties.getProperty( "soSendBufferSize", "-1" );
sscanf( dummy.c_str(), "%d", &soSendBufferSize );
-
+
// Now that we have all the elements that we wanted - let's do it!
// Create a TCP Socket and then Wrap it in a buffered socket
// so that users get the benefit of buffered reads and writes.
// The buffered socket will own the TcpSocket instance, and will
// clean it up when it is cleaned up.
TcpSocket* tcpSocket = new TcpSocket();
- /*BufferedSocket* bufferedSocket =
+ /*BufferedSocket* bufferedSocket =
new BufferedSocket(tcpSocket, inputBufferSize, outputBufferSize);*/
try
@@ -118,7 +120,7 @@
try{
delete tcpSocket;
} catch( SocketException& ex2 ){ /* Absorb */ }
-
+
throw ex;
}
@@ -126,5 +128,5 @@
}
AMQ_CATCH_RETHROW( SocketException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, SocketException )
- AMQ_CATCHALL_THROW( SocketException )
+ AMQ_CATCHALL_THROW( SocketException )
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h Thu Apr 12 13:29:39 2007
@@ -24,7 +24,7 @@
namespace network{
class Socket;
-
+
/**
* Socket Factory implementation for use in Creating Sockets
* <p>
@@ -32,7 +32,6 @@
* Property Options: <p>
* Name Value <p>
* ------------------------------------- <p>
- * uri The uri for the transport connection. Must be provided.<p>
* inputBufferSize size in bytes of the buffered input stream buffer. Defaults to 10000.<p>
* outputBufferSize size in bytes of the buffered output stream buffer. Defaults to 10000.<p>
* soLinger linger time for the socket (in microseconds). Defaults to 0.<p>
@@ -40,24 +39,26 @@
* soReceiveBufferSize The size of the socket receive buffer (in bytes). Defaults to 2MB.<p>
* soSendBufferSize The size of the socket send buffer (in bytes). Defaults to 2MB.<p>
* soTimeout The timeout of socket IO operations (in microseconds). Defaults to 10000<p>
- *
+ *
* @see <code>Socket</code>
*/
class SocketFactory
{
public:
- virtual ~SocketFactory();
-
+ virtual ~SocketFactory();
+
/**
* Creates and returns a Socket dervied Object based on the values
* defined in the Properties Object that is passed in.
+ * @param the URI for the Socket Connection.
* @param properties a IProperties pointer.
* @throws SocketException.
*/
- static Socket* createSocket( const util::Properties& properties )
+ static Socket* createSocket( const std::string& uri,
+ const util::Properties& properties )
throw ( SocketException );
-
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp Thu Apr 12 13:29:39 2007
@@ -18,7 +18,10 @@
#include <activemq/logger/LogWriter.h>
#include <activemq/transport/IOTransportFactory.h>
-#include <activemq/transport/TcpTransportFactory.h>
+#include <activemq/transport/filters/AsyncSendTransportFactory.h>
+#include <activemq/transport/filters/TcpTransportFactory.h>
+#include <activemq/transport/filters/LoggingTransportFactory.h>
+#include <activemq/transport/filters/ResponseCorrelatorFactory.h>
#include <activemq/connector/stomp/StompConnectorFactory.h>
#include <activemq/connector/openwire/OpenWireConnectorFactory.h>
@@ -35,7 +38,10 @@
logger::LogWriter::getInstance();
connector::stomp::StompConnectorFactory::getInstance();
connector::openwire::OpenWireConnectorFactory::getInstance();
- transport::TcpTransportFactory::getInstance();
+ transport::filters::TcpTransportFactory::getInstance();
+ transport::filters::AsyncSendTransportFactory::getInstance();
+ transport::filters::LoggingTransportFactory::getInstance();
+ transport::filters::ResponseCorrelatorFactory::getInstance();
transport::IOTransportFactory::getInstance();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Command.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Command.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Command.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Command.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_COMMAND_H_
#define ACTIVEMQ_TRANSPORT_COMMAND_H_
@@ -22,12 +22,12 @@
namespace activemq{
namespace transport{
-
+
class Command{
public:
-
+
virtual ~Command(){}
-
+
/**
* Sets the Command Id of this Message
* @param id Command Id
@@ -39,7 +39,7 @@
* @return Command Id
*/
virtual int getCommandId() const = 0;
-
+
/**
* Set if this Message requires a Response
* @param required true if response is required
@@ -51,15 +51,22 @@
* @return true if a response is required.
*/
virtual bool isResponseRequired() const = 0;
-
+
/**
* Returns a provider-specific string that provides information
* about the contents of the command.
*/
virtual std::string toString() const = 0;
-
+
+ /**
+ * Returns a Cloned copy of this command, the caller is responsible
+ * for deallocating the returned object.
+ * @returns new copy of this command.
+ */
+ virtual Command* cloneCommand() const = 0;
+
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_COMMAND_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_
#define ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_
@@ -23,7 +23,7 @@
namespace activemq{
namespace transport{
-
+
class CommandIOException : public io::IOException{
public:
@@ -31,7 +31,7 @@
* Default Constructor
*/
CommandIOException() throw() {};
-
+
/**
* Copy Constructor
* @param ex the exception to copy
@@ -41,7 +41,7 @@
{
*(exceptions::ActiveMQException*)this = ex;
}
-
+
/**
* Copy Constructor
* @param ex the exception to copy, which is an instance of this type
@@ -51,25 +51,25 @@
{
*(exceptions::ActiveMQException*)this = ex;
}
-
+
/**
* Consturctor
* @param file name of the file were the exception occured.
* @param lineNumber line where the exception occured
* @param msg the message that was generated
*/
- CommandIOException( const char* file, const int lineNumber,
+ CommandIOException( const char* file, const int lineNumber,
const char* msg, ... ) throw()
: io::IOException()
{
va_list vargs;
va_start( vargs, msg );
buildMessage( msg, vargs );
-
+
// Set the first mark for this exception.
setMark( file, lineNumber );
}
-
+
/**
* Clones this exception. This is useful for cases where you need
* to preserve the type of the original exception as well as the message.
@@ -79,10 +79,10 @@
virtual exceptions::ActiveMQException* clone() const{
return new CommandIOException( *this );
}
-
+
virtual ~CommandIOException() throw() {}
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_
#define ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_
@@ -22,23 +22,23 @@
namespace activemq{
namespace transport{
-
+
/**
* Interface for an observer of broker commands.
*/
class CommandListener{
public:
-
- virtual ~CommandListener(void){}
-
+
+ virtual ~CommandListener() {}
+
/**
* Event handler for the receipt of a command.
* @param command the received command object.
*/
virtual void onCommand( Command* command ) = 0;
-
+
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_COMMANDREADER_H_
#define ACTIVEMQ_TRANSPORT_COMMANDREADER_H_
@@ -24,7 +24,7 @@
namespace activemq{
namespace transport{
-
+
/**
* Interface for an object responsible for reading a command
* from an input stream.
@@ -32,19 +32,19 @@
class CommandReader : public io::Reader
{
public:
-
+
virtual ~CommandReader(){}
-
+
/**
* Reads a command from the given input stream.
* @return The next command available on the stream.
* @throws CommandIOException if a problem occurs during the read.
*/
- virtual Command* readCommand()
+ virtual Command* readCommand()
throw ( CommandIOException ) = 0;
};
-
+
}}
#endif /*ACTIVEMQ_COMMANDS_COMMANDREADER_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_
#define ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_
@@ -25,7 +25,7 @@
namespace activemq{
namespace transport{
-
+
/**
* Interface for an object responsible for writing a command
* to an output stream.
@@ -33,19 +33,19 @@
class CommandWriter : public io::Writer
{
public:
-
+
virtual ~CommandWriter() {}
-
+
/**
* Writes a command to the given output stream.
* @param command the command to write.
* @throws CommandIOException if a problem occurs during the write.
*/
- virtual void writeCommand( Command* command )
+ virtual void writeCommand( Command* command )
throw ( CommandIOException ) = 0;
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "IOTransport.h"
#include "CommandReader.h"
#include "CommandWriter.h"
@@ -31,7 +31,7 @@
////////////////////////////////////////////////////////////////////////////////
IOTransport::IOTransport(){
-
+
listener = NULL;
reader = NULL;
writer = NULL;
@@ -44,40 +44,40 @@
////////////////////////////////////////////////////////////////////////////////
IOTransport::~IOTransport(){
-
+
close();
}
////////////////////////////////////////////////////////////////////////////////
-void IOTransport::oneway( Command* command )
+void IOTransport::oneway( Command* command )
throw(CommandIOException, exceptions::UnsupportedOperationException)
{
if( closed ){
- throw CommandIOException( __FILE__, __LINE__,
+ throw CommandIOException( __FILE__, __LINE__,
"IOTransport::oneway() - transport is closed!" );
}
// Make sure the thread has been started.
if( thread == NULL ){
- throw CommandIOException(
- __FILE__, __LINE__,
+ throw CommandIOException(
+ __FILE__, __LINE__,
"IOTransport::oneway() - transport is not started" );
}
-
+
// Make sure the command object is valid.
if( command == NULL ){
- throw CommandIOException(
- __FILE__, __LINE__,
+ throw CommandIOException(
+ __FILE__, __LINE__,
"IOTransport::oneway() - attempting to write NULL command" );
}
-
+
// Make sure we have an output strema to write to.
if( outputStream == NULL ){
- throw CommandIOException(
- __FILE__, __LINE__,
+ throw CommandIOException(
+ __FILE__, __LINE__,
"IOTransport::oneway() - invalid output stream" );
}
-
+
synchronized( outputStream ){
// Write the command to the output stream.
writer->writeCommand( command );
@@ -86,38 +86,38 @@
////////////////////////////////////////////////////////////////////////////////
void IOTransport::start() throw( cms::CMSException ){
-
+
// Can't restart a closed transport.
if( closed ){
throw CommandIOException( __FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart" );
}
-
+
// If it's already started, do nothing.
if( thread != NULL ){
return;
- }
-
+ }
+
// Make sure all variables that we need have been set.
- if( inputStream == NULL || outputStream == NULL ||
+ if( inputStream == NULL || outputStream == NULL ||
reader == NULL || writer == NULL ){
- throw CommandIOException(
- __FILE__, __LINE__,
+ throw CommandIOException(
+ __FILE__, __LINE__,
"IOTransport::start() - "
"IO sreams and reader/writer must be set before calling start" );
}
-
+
// Init the Command Reader and Writer with the Streams
reader->setInputStream( inputStream );
writer->setOutputStream( outputStream );
-
+
// Start the polling thread.
thread = new Thread( this );
- thread->start();
+ thread->start();
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::close() throw( cms::CMSException ){
-
+
try{
if( closed ){
return;
@@ -125,28 +125,28 @@
// Mark this transport as closed.
closed = true;
-
+
// We have to close the input stream before
// we stop the thread. this will force us to
// wake up the thread if it's stuck in a read
// (which is likely). Otherwise, the join that
// follows will block forever.
if( inputStream != NULL ){
-
+
inputStream->close();
inputStream = NULL;
}
-
+
// Wait for the thread to die.
if( thread != NULL ){
thread->join();
delete thread;
thread = NULL;
- }
-
+ }
+
// Close the output stream.
if( outputStream != NULL ){
-
+
outputStream->close();
outputStream = NULL;
}
@@ -157,39 +157,40 @@
////////////////////////////////////////////////////////////////////////////////
void IOTransport::run(){
-
+
try{
-
+
while( !closed ){
-
+
// Read the next command from the input stream.
Command* command = reader->readCommand();
-
+
// Notify the listener.
fire( command );
}
-
+
}
catch( exceptions::ActiveMQException& ex ){
- ex.setMark( __FILE__, __LINE__ );
+ ex.setMark( __FILE__, __LINE__ );
fire( ex );
}
catch( ... ){
-
- exceptions::ActiveMQException ex(
- __FILE__, __LINE__,
+
+ exceptions::ActiveMQException ex(
+ __FILE__, __LINE__,
"IOTransport::run - caught unknown exception" );
LOGCMS_WARN(logger, ex.getStackTraceString() );
-
+
fire( ex );
}
}
////////////////////////////////////////////////////////////////////////////////
-Response* IOTransport::request( Command* command AMQCPP_UNUSED )
-throw( CommandIOException, exceptions::UnsupportedOperationException ){
+Response* IOTransport::request( Command* command AMQCPP_UNUSED )
+ throw( CommandIOException, exceptions::UnsupportedOperationException ){
+
throw exceptions::UnsupportedOperationException( __FILE__, __LINE__, "IOTransport::request() - unsupported operation" );
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_
#define ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_
@@ -29,7 +29,7 @@
namespace activemq{
namespace transport{
-
+
/**
* Implementation of the Transport interface that performs
* marshalling of commands to IO streams. This class does not
@@ -38,7 +38,7 @@
* a command is received, the command listener is notified. The
* polling thread is not started until the start method is called.
* The close method will close the associated streams. Close can
- * be called explicitly by the user, but is also called in the
+ * be called explicitly by the user, but is also called in the
* destructor. Once this object has been closed, it cannot be
* restarted.
*/
@@ -47,53 +47,53 @@
public Transport,
public concurrent::Runnable
{
-
+
LOGCMS_DECLARE( logger )
-
+
private:
-
+
/**
* Listener to incoming commands.
*/
CommandListener* listener;
-
+
/**
* Reads commands from the input stream.
*/
CommandReader* reader;
-
+
/**
* Writes commands to the output stream.
*/
CommandWriter* writer;
-
+
/**
* Listener of exceptions from this transport.
*/
TransportExceptionListener* exceptionListener;
-
+
/**
* The input stream for incoming commands.
*/
io::InputStream* inputStream;
-
+
/**
* The output stream for out-going commands.
*/
io::OutputStream* outputStream;
-
+
/**
* The polling thread.
*/
concurrent::Thread* thread;
-
+
/**
* Flag marking this transport as closed.
*/
bool closed;
-
+
private:
-
+
/**
* Notify the excpetion listener
* @param ex the exception to send
@@ -101,19 +101,19 @@
void fire( exceptions::ActiveMQException& ex ){
if( exceptionListener != NULL ){
-
+
try{
exceptionListener->onTransportException( this, ex );
}catch( ... ){}
- }
+ }
}
-
+
/**
* Notify the command listener.
* @param command the command the send
*/
void fire( Command* command ){
-
+
try{
// Since the listener is responsible for freeing the memory,
// if there is no listener - free the command here.
@@ -121,17 +121,17 @@
delete command;
return;
}
-
+
listener->onCommand( command );
-
+
}catch( ... ){}
}
-
+
public:
-
+
IOTransport();
virtual ~IOTransport();
-
+
/**
* Sends a one-way command. Does not wait for any response from the
* broker.
@@ -142,7 +142,7 @@
* by this transport.
*/
virtual void oneway( Command* command ) throw( CommandIOException, exceptions::UnsupportedOperationException );
-
+
/**
* Not supported by this class - throws an exception.
* @param command the command to be sent.
@@ -150,7 +150,7 @@
* @throws UnsupportedOperationException.
*/
virtual Response* request( Command* command ) throw( CommandIOException, exceptions::UnsupportedOperationException );
-
+
/**
* Assigns the command listener for non-response commands.
* @param listener the listener.
@@ -158,7 +158,7 @@
virtual void setCommandListener( CommandListener* listener ){
this->listener = listener;
}
-
+
/**
* Sets the command reader.
* @param reader the object that will be used for reading command objects.
@@ -166,7 +166,7 @@
virtual void setCommandReader( CommandReader* reader ){
this->reader = reader;
}
-
+
/**
* Sets the command writer.
* @param writer the object that will be used for writing command objects.
@@ -174,7 +174,7 @@
virtual void setCommandWriter( CommandWriter* writer ){
this->writer = writer;
}
-
+
/**
* Sets the observer of asynchronous exceptions from this transport.
* @param listener the listener of transport exceptions.
@@ -182,7 +182,7 @@
virtual void setTransportExceptionListener( TransportExceptionListener* listener ){
this->exceptionListener = listener;
}
-
+
/**
* Sets the input stream for in-coming commands.
* @param is The input stream.
@@ -190,7 +190,7 @@
virtual void setInputStream( io::InputStream* is ){
this->inputStream = is;
}
-
+
/**
* Sets the output stream for out-going commands.
* @param os The output stream.
@@ -198,7 +198,7 @@
virtual void setOutputStream( io::OutputStream* os ){
this->outputStream = os;
}
-
+
/**
* Starts this transport object and creates the thread for
* polling on the input stream for commands. If this object
@@ -209,7 +209,7 @@
* has already been closed.
*/
virtual void start() throw( cms::CMSException );
-
+
/**
* Stops the polling thread and closes the streams. This can
* be called explicitly, but is also called in the destructor. Once
@@ -217,14 +217,14 @@
* @throws CMSException if errors occur.
*/
virtual void close() throw( cms::CMSException );
-
+
/**
* Runs the polling thread.
*/
virtual void run();
-
+
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -14,25 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "IOTransportFactory.h"
#include <activemq/util/Config.h>
+#include <activemq/transport/IOTransport.h>
+using namespace activemq;
using namespace activemq::transport;
////////////////////////////////////////////////////////////////////////////////
-Transport* IOTransportFactory::createTransport(
- const activemq::util::Properties& properties AMQCPP_UNUSED )
-{
+Transport* IOTransportFactory::createTransport(
+ const activemq::util::Properties& properties AMQCPP_UNUSED,
+ Transport* next AMQCPP_UNUSED,
+ bool own AMQCPP_UNUSED ) throw ( exceptions::ActiveMQException ) {
+
+ // IO is the Base Tranport, it can have no next.
return new IOTransport();
}
-
+
////////////////////////////////////////////////////////////////////////////////
-TransportFactory& IOTransportFactory::getInstance(void)
-{
+TransportFactory& IOTransportFactory::getInstance() {
+
// Create the one and only instance of the registrar
static TransportFactoryMapRegistrar registrar(
- "io", new IOTransportFactory());
-
+ "transport.IOTransport", new IOTransportFactory() );
+
return registrar.getFactory();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -18,32 +18,36 @@
#ifndef ACTIVEMQ_TRANSPORT_IOTRANSPORTFACTORY_H_
#define ACTIVEMQ_TRANSPORT_IOTRANSPORTFACTORY_H_
-#include <activemq/transport/IOTransport.h>
#include <activemq/transport/TransportFactory.h>
#include <activemq/transport/TransportFactoryMapRegistrar.h>
namespace activemq{
namespace transport{
-
+
/**
* Manufactures IOTransports, which are objects that
* read from input streams and write to output streams.
*/
class IOTransportFactory : public TransportFactory{
private:
-
+
static TransportFactoryMapRegistrar registrar;
-
+
public:
-
+
virtual ~IOTransportFactory(){}
-
+
/**
* Creates a Transport instance.
- * @param properties The properties for the transport.
+ * @param properties - Object that will hold transport config values
+ * @param next - the next transport in the chain, or NULL
+ * @param own - does the new Transport own the next
+ * @throws ActiveMQException if an error occurs.
*/
- virtual Transport* createTransport(
- const activemq::util::Properties& properties );
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next,
+ bool own ) throw ( exceptions::ActiveMQException );
/**
* Returns a reference to this TransportFactory
@@ -52,7 +56,7 @@
static TransportFactory& getInstance(void);
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_IOTRANSPORTFACTORY_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Response.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Response.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Response.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Response.h Thu Apr 12 13:29:39 2007
@@ -32,7 +32,7 @@
class Response : public Command{
public:
- virtual ~Response(void) {}
+ virtual ~Response() {}
/**
* Gets the Correlation Id that is associated with this message
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h Thu Apr 12 13:29:39 2007
@@ -106,6 +106,7 @@
*/
virtual void setTransportExceptionListener(
TransportExceptionListener* listener ) = 0;
+
};
}}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TransportBuilder.h"
+
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/transport/TransportFactoryMap.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportBuilder::TransportBuilder()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TransportBuilder::~TransportBuilder()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* TransportBuilder::buildTransport( const std::string& url,
+ util::Properties& properties )
+ throw ( cms::CMSException ) {
+
+ try{
+
+ Transport* transport = NULL;
+
+ // Parse out the properties from the URI
+ parseURL( url, properties );
+
+ // Create the Base IO Transport
+ transport = this->createTransport(
+ "transport.IOTransport", properties );
+
+ // Create the Transport for our protocol
+ transport = this->createTransport(
+ properties.getProperty( "transport.protocol", "tcp" ),
+ properties,
+ transport );
+
+ // If async sends are enabled, wrap the transport with a AsyncSendTransport
+ // do this before the response correlator so that all commands go out on the
+ // send message queue, otherwise messages could get sent out of order.
+ if( properties.getProperty( "transport.useAsyncSend", "false" ) == "true" ) {
+ // Create the Transport for response correlator
+ transport = this->createTransport(
+ "transport.filters.AsyncSendTransport", properties, transport );
+ }
+
+ // Create the Transport for response correlator
+ transport = this->createTransport(
+ "transport.filters.ResponseCorrelator", properties, transport );
+
+ // If command tracing was enabled, wrap the transport with a logging transport.
+ if( properties.getProperty( "transport.commandTracingEnabled", "false" ) == "true" ) {
+ // Create the Transport for response correlator
+ transport = this->createTransport(
+ "transport.filters.LoggingTransport", properties, transport );
+ }
+
+ return transport;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TransportBuilder::parseURL( const std::string& URI,
+ util::Properties& properties )
+ throw ( exceptions::IllegalArgumentException ) {
+
+ try
+ {
+ StringTokenizer tokenizer( URI, ":/" );
+
+ std::vector<std::string> tokens;
+
+ // Require that there be three tokens at the least, these are
+ // transport, url, port.
+ if( tokenizer.countTokens() < 3 )
+ {
+ throw exceptions::IllegalArgumentException(
+ __FILE__, __LINE__,
+ (string( "TransportBuilder::parseURL - "
+ "Marlformed URI: ") + URI).c_str() );
+ }
+
+ // First element should be the Transport Type, following that is the
+ // URL and any params.
+ properties.setProperty( "transport.protocol", tokenizer.nextToken() );
+
+ // Parse URL and Port as one item, optional params follow the ?
+ // and then each param set is delimited with & we extract first
+ // three chars as they are the left over ://
+ properties.setProperty( "transport.uri", tokenizer.nextToken("&?").substr(3) );
+
+ // Now get all the optional parameters and store them as properties
+ int count = tokenizer.toArray( tokens );
+
+ for( int i = 0; i < count; ++i )
+ {
+ tokenizer.reset( tokens[i], "=" );
+
+ if( tokenizer.countTokens() != 2 )
+ {
+ throw exceptions::IllegalArgumentException(
+ __FILE__, __LINE__,
+ ( string( "TransportBuilder::parseURL - "
+ "Marlformed Parameter = " ) + tokens[i] ).c_str() );
+ }
+
+ // Get them in order, passing both as nextToken calls in the
+ // set Property can cause reversed order.
+ string key = tokenizer.nextToken();
+ string value = tokenizer.nextToken();
+
+ // Store this param as a property
+ properties.setProperty( key, value );
+ }
+ }
+ AMQ_CATCH_RETHROW( IllegalArgumentException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IllegalArgumentException )
+ AMQ_CATCHALL_THROW( IllegalArgumentException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* TransportBuilder::createTransport( const std::string& name,
+ const util::Properties& properties,
+ Transport* next )
+ throw ( cms::CMSException ) {
+
+ try{
+
+ // Create the Transport that the Connector will use.
+ TransportFactory* factory =
+ TransportFactoryMap::getInstance().lookup( name );
+ if( factory == NULL ){
+ throw ActiveMQException(
+ __FILE__, __LINE__,
+ ( string( "TransportBuilder::createTransport - " ) +
+ string( "unknown transport factory" ) + name ).c_str() );
+ }
+
+ // Create the transport.
+ Transport* transport = factory->createTransport( properties, next, true );
+ if( transport == NULL ){
+ throw ActiveMQException(
+ __FILE__, __LINE__,
+ ( string( "TransportBuilder::createTransport - " ) +
+ string( "failed creating new Transport" ) + name ).c_str() );
+ }
+
+ return transport;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportBuilder.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_TRANSPORTBUILDER_H_
+#define _ACTIVEMQ_TRANSPORT_TRANSPORTBUILDER_H_
+
+#include <activemq/transport/Transport.h>
+#include <activemq/util/Properties.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * This class controls the creation of transports based on configuration
+ * options set in a properties object.
+ */
+ class TransportBuilder
+ {
+ public:
+
+ TransportBuilder();
+ virtual ~TransportBuilder();
+
+ public:
+
+ /**
+ * Builds a transport from the passed URL, filling in the properties
+ * object that its passed with any properties that it finds in the
+ * URL. Any properties that the caller sets before calling this method
+ * are preserved.
+ * @param url - URL of the Broker
+ * @param properties - Properties object to fill
+ * @returns pointer to a new Transport, caller owns.
+ * @throws CMSException on failure.
+ */
+ virtual Transport* buildTransport( const std::string& url,
+ util::Properties& properties )
+ throw ( cms::CMSException );
+
+ protected:
+
+ /**
+ * Parses the properties out of the provided Broker URI and sets
+ * them in the passed Properties Object.
+ * @param URI a Broker URI to parse
+ * @param properties a Properties object to set the parsed values in
+ * @throws IllegalArgumentException if the passed URI is invalid
+ */
+ virtual void parseURL( const std::string& URI,
+ util::Properties& properties )
+ throw ( exceptions::IllegalArgumentException );
+
+ /**
+ * Given a Transport Name and the properties it should use to configure
+ * itself, create it. If the name cannot be linked to a transport
+ * factory then an exception is thrown.
+ * @param name - Name of the Transport to Create
+ * @param next - Next Transport in the chain.
+ * @param properties - Properties to configure the transport
+ * @returns a newly created transport.
+ * @throws CMSException if an error occurs during creation.
+ */
+ virtual Transport* createTransport( const std::string& name,
+ const util::Properties& properties,
+ Transport* next = NULL )
+ throw ( cms::CMSException );
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_TRANSPORTBUILDER_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportExceptionListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportExceptionListener.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportExceptionListener.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportExceptionListener.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_TRANSPORTEXCEPTIONLISTENER_H_
#define ACTIVEMQ_TRANSPORT_TRANSPORTEXCEPTIONLISTENER_H_
@@ -22,29 +22,29 @@
namespace activemq{
namespace transport{
-
+
// Forward declarations.
class Transport;
-
+
/**
* A listener of asynchronous exceptions from a command transport object.
*/
class TransportExceptionListener{
public:
-
- virtual ~TransportExceptionListener(){}
-
+
+ virtual ~TransportExceptionListener() {}
+
/**
* Event handler for an exception from a command transport.
* @param source The source of the exception
* @param ex The exception.
*/
- virtual void onTransportException(
- Transport* source,
+ virtual void onTransportException(
+ Transport* source,
const exceptions::ActiveMQException& ex ) = 0;
-
+
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTEXCEPTIONLISTENER_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h Thu Apr 12 13:29:39 2007
@@ -18,30 +18,39 @@
#ifndef ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_
#define ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_
-#include <activemq/transport/IOTransport.h>
-#include <activemq/transport/ResponseCorrelator.h>
-#include <activemq/io/InputStream.h>
-#include <activemq/io/OutputStream.h>
-#include <activemq/exceptions/IllegalArgumentException.h>
+#include <activemq/transport/Transport.h>
#include <activemq/util/Properties.h>
+#include <activemq/util/Config.h>
namespace activemq{
namespace transport{
-
+
+ /**
+ * Defines the interface for Factories that create Transports or
+ * TransportFilters. Since Transports can be chained, the create
+ * method takes a pointer to the next transport in the list, and
+ * wether the newly create transport owns the next and should delete
+ * it on its own destruction.
+ */
class TransportFactory{
public:
-
- virtual ~TransportFactory(void){}
-
+
+ virtual ~TransportFactory() {}
+
/**
* Creates a Transport instance.
- * @param properties Object that will hold transport values
+ * @param properties - Object that will hold transport config values
+ * @param next - the next transport in the chain, or NULL
+ * @param own - does the new Transport own the next
+ * @throws ActiveMQexception if an error occurs
*/
virtual Transport* createTransport(
- const activemq::util::Properties& properties ) = 0;
+ const activemq::util::Properties& properties,
+ Transport* next = NULL,
+ bool own = true ) throw ( exceptions::ActiveMQException ) = 0;
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.cpp Thu Apr 12 13:29:39 2007
@@ -22,42 +22,44 @@
using namespace std;
////////////////////////////////////////////////////////////////////////////////
-TransportFactoryMap& TransportFactoryMap::getInstance(void)
+TransportFactoryMap& TransportFactoryMap::getInstance()
{
// Static instance of this Map, create here so that one will
- // always exist, the one and only Connector Map.
+ // always exist, the one and only Connector Map.
static TransportFactoryMap instance;
-
+
return instance;
-}
+}
////////////////////////////////////////////////////////////////////////////////
-void TransportFactoryMap::registerTransportFactory(
- const std::string& name,
+void TransportFactoryMap::registerTransportFactory(
+ const std::string& name,
TransportFactory* factory )
{
factoryMap[name] = factory;
}
////////////////////////////////////////////////////////////////////////////////
-TransportFactoryMap::TransportFactoryMap(
-const TransportFactoryMap& factoryMap AMQCPP_UNUSED ){
+TransportFactoryMap::TransportFactoryMap(
+ const TransportFactoryMap& factoryMap AMQCPP_UNUSED ){
}
-
+
////////////////////////////////////////////////////////////////////////////////
-TransportFactoryMap& TransportFactoryMap::operator=(
-const TransportFactoryMap& factoryMap AMQCPP_UNUSED ){
+TransportFactoryMap& TransportFactoryMap::operator=(
+ const TransportFactoryMap& factoryMap AMQCPP_UNUSED ){
+
return *this;
}
+
////////////////////////////////////////////////////////////////////////////////
void TransportFactoryMap::unregisterTransportFactory( const std::string& name ){
factoryMap.erase( name );
}
////////////////////////////////////////////////////////////////////////////////
-TransportFactory* TransportFactoryMap::lookup( const std::string& name )
-{
- map<string, TransportFactory*>::const_iterator itr =
+TransportFactory* TransportFactoryMap::lookup( const std::string& name ) {
+
+ map<string, TransportFactory*>::const_iterator itr =
factoryMap.find(name);
if( itr != factoryMap.end() )
@@ -70,16 +72,16 @@
}
////////////////////////////////////////////////////////////////////////////////
-size_t TransportFactoryMap::getFactoryNames(
+size_t TransportFactoryMap::getFactoryNames(
std::vector< std::string >& factoryList )
-{
+{
map<string, TransportFactory*>::const_iterator itr =
factoryMap.begin();
-
+
for(; itr != factoryMap.end(); ++itr)
{
factoryList.insert( factoryList.end(), itr->first );
}
-
+
return factoryMap.size();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMap.h Thu Apr 12 13:29:39 2007
@@ -24,70 +24,70 @@
namespace activemq{
namespace transport{
-
+
/**
* The TransportFactoryMap contains keys that map to specific versions
* of the TransportFactory class which create a particular type of
* Transport.
*/
class TransportFactoryMap{
-
+
private:
-
+
// Map of Factories
std::map<std::string, TransportFactory*> factoryMap;
private:
-
+
// Hidden Contrustor, prevents instantiation
- TransportFactoryMap() {};
-
+ TransportFactoryMap() {}
+
// Hidden Destructor.
- virtual ~TransportFactoryMap() {};
-
+ virtual ~TransportFactoryMap() {}
+
// Hidden Copy Constructore
TransportFactoryMap( const TransportFactoryMap& factoryMap );
-
+
// Hidden Assignment operator
TransportFactoryMap& operator=( const TransportFactoryMap& factoryMap );
-
+
public:
-
+
/**
* Gets a singleton instance of this class.
*/
- static TransportFactoryMap& getInstance(void);
-
+ static TransportFactoryMap& getInstance();
+
/**
* Registers a new Transport Factory with this map
* @param name to associate the factory with
* @param factory to store.
*/
- void registerTransportFactory( const std::string& name,
+ void registerTransportFactory( const std::string& name,
TransportFactory* factory );
-
+
/**
* Unregisters a Transport Factory with this map
* @param name of the factory to remove
*/
void unregisterTransportFactory( const std::string& name );
-
+
/**
* Lookup the named factory in the Map
* @param name the factory name to lookup
* @return the factory assciated with the name, or NULL
*/
TransportFactory* lookup( const std::string& name );
-
+
/**
* Fetch a list of factory names that this Map contains
* @param factoryList vector object to receive the list
* @returns count of factories.
*/
std::size_t getFactoryNames( std::vector< std::string >& factoryList );
-
+
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFACTORYMAP_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMapRegistrar.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMapRegistrar.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMapRegistrar.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactoryMapRegistrar.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_TRANSPORTFACTORYMAPREGISTRAR_H_
#define ACTIVEMQ_TRANSPORT_TRANSPORTFACTORYMAPREGISTRAR_H_
@@ -30,36 +30,36 @@
class TransportFactoryMapRegistrar
{
public:
-
- /**
+
+ /**
* Constructor for this class
* @param name of the factory to register
* @param factory responsible for creating the transport
- * @param manageLifetime boolean indicating if this object manages the
+ * @param manageLifetime boolean indicating if this object manages the
* lifetime of the factory that is being registered.
*/
- TransportFactoryMapRegistrar( const std::string& name,
- TransportFactory* factory,
- bool manageLifetime = true)
- {
+ TransportFactoryMapRegistrar( const std::string& name,
+ TransportFactory* factory,
+ bool manageLifetime = true)
+ {
// Register it in the map.
TransportFactoryMap::getInstance().
- registerTransportFactory(name, factory);
+ registerTransportFactory( name, factory );
- // Store for later deletion
+ // Store for later deletion
this->factory = factory;
this->manageLifetime = manageLifetime;
this->name = name;
}
-
- virtual ~TransportFactoryMapRegistrar(void)
+
+ virtual ~TransportFactoryMapRegistrar()
{
try
{
// UnRegister it in the map.
TransportFactoryMap::getInstance().
unregisterTransportFactory( name );
-
+
if( manageLifetime )
{
delete factory;
@@ -67,24 +67,24 @@
}
catch(...) {}
}
-
+
/**
* Return a reference to the factory object that is contained in this
* registrar.
* @return TransportFactory reference
*/
- virtual TransportFactory& getFactory(void) {
+ virtual TransportFactory& getFactory() {
return *factory;
}
-
+
private:
-
- std::string name;
+
+ std::string name;
TransportFactory* factory;
- bool manageLifetime;
+ bool manageLifetime;
+
+ };
- };
-
}}
#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFACTORYMAPREGISTRAR_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp Thu Apr 12 13:29:39 2007
@@ -22,9 +22,32 @@
using namespace activemq::transport;
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::onTransportException(
- Transport* source AMQCPP_UNUSED,
- const exceptions::ActiveMQException& ex )
-{
+TransportFilter::TransportFilter( Transport* next, const bool own ) {
+
+ this->next = next;
+ this->own = own;
+
+ commandlistener = NULL;
+ exceptionListener = NULL;
+
+ // Observe the nested transport for events.
+ next->setCommandListener( this );
+ next->setTransportExceptionListener( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFilter::~TransportFilter() {
+
+ if( own ){
+ delete next;
+ next = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TransportFilter::onTransportException(
+ Transport* source AMQCPP_UNUSED,
+ const exceptions::ActiveMQException& ex ) {
+
fire( ex );
}