You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/04/12 22:29:43 UTC
svn commit: r528222 [3/5] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: examples/ main/
main/activemq/connector/openwire/
main/activemq/connector/openwire/commands/ main/activemq/connector/stomp/
main/activemq/connector/stomp/commands/ main/activ...
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#ifndef ACTIVEMQ_TRANSPORT_TRANSPORTFILTER_H_
#define ACTIVEMQ_TRANSPORT_TRANSPORTFILTER_H_
@@ -25,43 +25,43 @@
namespace activemq{
namespace transport{
-
+
/**
* A filter on the transport layer. Transport
- * filters implement the Transport interface and
+ * filters implement the Transport interface and
* optionally delegate calls to another Transport object.
*/
- class TransportFilter
- :
+ class TransportFilter
+ :
public Transport,
public CommandListener,
public TransportExceptionListener
{
- protected:
-
+ protected:
+
/**
* The transport that this filter wraps around.
*/
Transport* next;
-
+
/**
* Flag to indicate whether this object controls
* the lifetime of the next transport object.
*/
bool own;
-
+
/**
* Listener to incoming commands.
*/
CommandListener* commandlistener;
-
+
/**
* Listener of exceptions from this transport.
*/
TransportExceptionListener* exceptionListener;
-
+
protected:
-
+
/**
* Notify the excpetion listener
* @param ex - the exception to send to listeners
@@ -69,55 +69,37 @@
void fire( const exceptions::ActiveMQException& ex ){
if( exceptionListener != NULL ){
-
+
try{
exceptionListener->onTransportException( this, ex );
}catch( ... ){}
- }
+ }
}
-
+
/**
* Notify the command listener.
* @param command - the command to send to the listener
*/
void fire( Command* command ){
-
+
try{
if( commandlistener != NULL ){
commandlistener->onCommand( command );
}
}catch( ... ){}
}
-
+
public:
-
+
/**
* Constructor.
* @param next - the next Transport in the chain
* @param own - true if this filter owns the next and should delete it
*/
- TransportFilter( Transport* next, const bool own = true ){
-
- this->next = next;
- this->own = own;
-
- commandlistener = NULL;
- exceptionListener = NULL;
-
- // Observe the nested transport for events.
- next->setCommandListener( this );
- next->setTransportExceptionListener( this );
- }
-
- virtual ~TransportFilter(){
-
- if( own ){
- delete next;
- next = NULL;
- }
-
- }
-
+ TransportFilter( Transport* next, const bool own = true );
+
+ virtual ~TransportFilter();
+
/**
* Event handler for the receipt of a command.
* @param command - the received command object.
@@ -125,14 +107,14 @@
virtual void onCommand( Command* command ){
fire( command );
}
-
+
/**
* Event handler for an exception from a command transport.
* @param source The source of the exception
* @param ex The exception.
*/
virtual void onTransportException( Transport* source, const exceptions::ActiveMQException& ex );
-
+
/**
* Sends a one-way command. Does not wait for any response from the
* broker.
@@ -145,7 +127,7 @@
virtual void oneway( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException){
next->oneway( command );
}
-
+
/**
* Not supported by this class - throws an exception.
* @param command the command that is sent as a request
@@ -154,7 +136,7 @@
virtual Response* request( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException){
return next->request( command );
}
-
+
/**
* Assigns the command listener for non-response commands.
* @param listener the listener.
@@ -162,7 +144,7 @@
virtual void setCommandListener( CommandListener* listener ){
this->commandlistener = listener;
}
-
+
/**
* Sets the command reader.
* @param reader the object that will be used for reading command objects.
@@ -170,7 +152,7 @@
virtual void setCommandReader( CommandReader* reader ){
next->setCommandReader( reader );
}
-
+
/**
* Sets the command writer.
* @param writer the object that will be used for writing command objects.
@@ -178,7 +160,7 @@
virtual void setCommandWriter( CommandWriter* writer ){
next->setCommandWriter( writer );
}
-
+
/**
* Sets the observer of asynchronous exceptions from this transport.
* @param listener the listener of transport exceptions.
@@ -186,7 +168,7 @@
virtual void setTransportExceptionListener( TransportExceptionListener* listener ){
this->exceptionListener = listener;
}
-
+
/**
* Starts this transport object and creates the thread for
* polling on the input stream for commands. If this object
@@ -197,21 +179,21 @@
* has already been closed.
*/
virtual void start() throw( cms::CMSException ){
-
+
if( commandlistener == NULL ){
throw exceptions::ActiveMQException( __FILE__, __LINE__,
"commandListener is invalid" );
}
-
+
if( exceptionListener == NULL ){
throw exceptions::ActiveMQException( __FILE__, __LINE__,
"exceptionListener is invalid" );
}
-
+
// Start the delegate transport object.
next->start();
}
-
+
/**
* Stops the polling thread and closes the streams. This can
* be called explicitly, but is also called in the destructor. Once
@@ -219,12 +201,12 @@
* @throws CMSException if errors occur.
*/
virtual void close() throw( cms::CMSException ){
-
+
next->close();
}
-
+
};
-
+
}}
#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFILTER_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AsyncSendTransport.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSendTransport::AsyncSendTransport( Transport* next, bool own )
+ : TransportFilter( next, own )
+{
+ this->closed = true;
+ this->asyncThread = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSendTransport::~AsyncSendTransport()
+{
+ try {
+ close();
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::oneway( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+ try{
+
+ // Put it in the send queue, thread will dispatch it. We clone it
+ // in case the client deletes their copy before we get a chance to
+ // send it.
+ synchronized( &msgQueue ) {
+ msgQueue.push( command->cloneCommand() );
+ msgQueue.notifyAll();
+ }
+ }
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::start() throw( cms::CMSException ) {
+
+ try {
+
+ this->closed = false;
+
+ // Kill the thread
+ this->startThread();
+
+ next->start();
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::close() throw( cms::CMSException ) {
+
+ try {
+
+ this->closed = true;
+
+ // Kill the thread
+ this->stopThread();
+
+ next->close();
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::run()
+{
+ try{
+
+ while( !closed )
+ {
+ Command* command = NULL;
+
+ synchronized( &msgQueue )
+ {
+ // Gaurd against spurious wakeup or race to sync lock
+ // also if the listner has been unregistered we don't
+ // have anyone to notify, so we wait till a new one is
+ // registered, and then we will deliver the backlog
+ while( msgQueue.empty() )
+ {
+ if( closed )
+ {
+ break;
+ }
+ msgQueue.wait();
+ }
+
+ // don't want to process messages if we are shutting down.
+ if( closed )
+ {
+ return;
+ }
+
+ // get the data
+ command = msgQueue.pop();
+ }
+
+ // Dispatch the message
+ next->oneway( command );
+
+ // Destroy Our copy of the message
+ delete command;
+ }
+ }
+ catch(...)
+ {
+ this->fire( ActiveMQException(
+ __FILE__, __LINE__,
+ "AsyncSendTransport::run - "
+ "Connector threw an unknown Exception, recovering..." ) );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::startThread() throw ( ActiveMQException ) {
+
+ try
+ {
+ // Start the thread, if it's not already started.
+ if( asyncThread == NULL )
+ {
+ asyncThread = new Thread( this );
+ asyncThread->start();
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::stopThread() throw ( ActiveMQException ) {
+
+ try
+ {
+ // if the thread is running signal it to quit and then
+ // wait for run to return so thread can die
+ if( asyncThread != NULL )
+ {
+ synchronized( &msgQueue )
+ {
+ // Force a wakeup if run is in a wait.
+ msgQueue.notifyAll();
+ }
+
+ // Wait for it to die and then delete it.
+ asyncThread->join();
+ delete asyncThread;
+ asyncThread = NULL;
+ }
+
+ // Clean all the messages up
+ purgeMessages();
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSendTransport::purgeMessages() throw ( ActiveMQException )
+{
+ try
+ {
+ synchronized( &msgQueue )
+ {
+ while( !msgQueue.empty() )
+ {
+ // destroy these messages if this is not a transacted
+ // session, if it is then the tranasction will clean
+ // the messages up.
+ delete msgQueue.pop();
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransport.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORT_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORT_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/util/Queue.h>
+#include <activemq/concurrent/Runnable.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ class AsyncSendTransport : public TransportFilter,
+ public concurrent::Runnable {
+ private:
+
+ /**
+ * Thread to send messages in when oneway is called.
+ */
+ concurrent::Thread* asyncThread;
+
+ /**
+ * Outgoing Message Queue
+ */
+ util::Queue<Command*> msgQueue;
+
+ /**
+ * boolean indicating that this transport is closed
+ */
+ bool closed;
+
+ public:
+
+ /**
+ * Constructor.
+ * @param next - the next Transport in the chain
+ * @param own - true if this filter owns the next and should delete it
+ */
+ AsyncSendTransport( Transport* next, bool own = true );
+
+ virtual ~AsyncSendTransport();
+
+ /**
+ * Sends a one-way command. Does not wait for any response from the
+ * broker.
+ * @param command the command to be sent.
+ * @throws CommandIOException if an exception occurs during writing of
+ * the command.
+ * @throws UnsupportedOperationException if this method is not implemented
+ * by this transport.
+ */
+ virtual void oneway( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException );
+
+ /**
+ * Starts this transport object and creates the thread for
+ * polling on the input stream for commands. If this object
+ * has been closed, throws an exception. Before calling start,
+ * the caller must set the IO streams and the reader and writer
+ * objects.
+ * @throws CMSException if an error occurs or if this transport
+ * has already been closed.
+ */
+ virtual void start() throw( cms::CMSException );
+
+ /**
+ * Stops the polling thread and closes the streams. This can
+ * be called explicitly, but is also called in the destructor. Once
+ * this object has been closed, it cannot be restarted.
+ * @throws CMSException if errors occur.
+ */
+ virtual void close() throw( cms::CMSException );
+
+ protected:
+
+ /**
+ * Run method that is called from the Thread class when this object
+ * is registered with a Thread and started. This function reads from
+ * the outgoing message queue and dispatches calls to the connector that
+ * is registered with this class.
+ */
+ virtual void run();
+
+ /**
+ * Starts the message processing thread to receive messages
+ * asynchronously. This thread is started when setMessageListener
+ * is invoked, which means that the caller is choosing to use this
+ * consumer asynchronously instead of synchronously (receive).
+ */
+ void startThread() throw ( exceptions::ActiveMQException );
+
+ /**
+ * Stops the asynchronous message processing thread if it's started.
+ */
+ void stopThread() throw ( exceptions::ActiveMQException );
+
+ /**
+ * Purges all messages currently in the queue. This can be as a
+ * result of a rollback, or of the consumer being shutdown.
+ */
+ virtual void purgeMessages() throw ( exceptions::ActiveMQException );
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORT_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AsyncSendTransportFactory.h"
+
+#include <activemq/transport/filters/AsyncSendTransport.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& AsyncSendTransportFactory::getInstance(void)
+{
+ // Create the one and only instance of the registrar
+ static TransportFactoryMapRegistrar registrar(
+ "transport.filters.AsyncSendTransport",
+ new AsyncSendTransportFactory() );
+
+ return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* AsyncSendTransportFactory::createTransport(
+ const activemq::util::Properties& properties AMQCPP_UNUSED,
+ Transport* next,
+ bool own ) throw ( ActiveMQException ) {
+
+ try{
+ return new AsyncSendTransport( next, own );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/AsyncSendTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORTFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORTFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * Factory Responsible for creating the AsyncSendTransport.
+ */
+ class AsyncSendTransportFactory : public TransportFactory {
+
+ public:
+
+ virtual ~AsyncSendTransportFactory() {}
+
+ /**
+ * Creates a Transport instance.
+ * @param properties - Object that will hold transport config values
+ * @param next - the next transport in the chain, or NULL
+ * @param own - does the new Transport own the next
+ * @throws ActiveMQException if an error occurs.
+ */
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next,
+ bool own ) throw ( exceptions::ActiveMQException );
+
+ /**
+ * Returns a reference to this TransportFactory
+ * @returns TransportFactory Reference
+ */
+ static TransportFactory& getInstance();
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_ASYNCSENDTRANSPORTFACTORY_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/FutureResponse.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_FUTURERESPONSE_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_FUTURERESPONSE_H_
+
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/transport/Response.h>
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * A container that holds a response object. Since this
+ * object is Synchronizable, callers can wait on this object
+ * and when a response comes in, notify can be called to
+ * inform those waiting that the response is now available.
+ */
+ class FutureResponse : public concurrent::Synchronizable{
+ private:
+
+ Response* response;
+ concurrent::Mutex mutex;
+
+ public:
+
+ FutureResponse(){
+ response = NULL;
+ }
+
+ virtual ~FutureResponse(){}
+
+ /**
+ * Locks the object.
+ * @throws ActiveMQException
+ */
+ virtual void lock() throw( exceptions::ActiveMQException ){
+ mutex.lock();
+ }
+
+ /**
+ * Unlocks the object.
+ * @throws ActiveMQException
+ */
+ virtual void unlock() throw( exceptions::ActiveMQException ){
+ mutex.unlock();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void wait() throw( exceptions::ActiveMQException ){
+ mutex.wait();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait( unsigned long millisecs )
+ throw( exceptions::ActiveMQException )
+ {
+ mutex.wait( millisecs );
+ }
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notify() throw( exceptions::ActiveMQException ){
+ mutex.notify();
+ }
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notifyAll() throw( exceptions::ActiveMQException ){
+ mutex.notifyAll();
+ }
+
+ /**
+ * Getters for the response property.
+ * @return the response object for the request
+ */
+ virtual const Response* getResponse() const{
+ return response;
+ }
+ virtual Response* getResponse(){
+ return response;
+ }
+
+ /**
+ * Setter for the response property.
+ * @param response the response object for the request.
+ */
+ virtual void setResponse( Response* response ){
+ this->response = response;
+ }
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_FUTURERESPONSE_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LoggingTransport.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+
+LOGCMS_INITIALIZE( logger, LoggingTransport, "activemq.transport.filters.LoggingTransport")
+
+////////////////////////////////////////////////////////////////////////////////
+LoggingTransport::LoggingTransport( Transport* next, bool own )
+ : TransportFilter( next, own )
+{}
+
+////////////////////////////////////////////////////////////////////////////////
+void LoggingTransport::onCommand( Command* command ) {
+
+ ostringstream ostream;
+ ostream << "*** BEGIN RECEIVED ASYNCHRONOUS COMMAND ***" << endl;
+ ostream << command->toString() << endl;
+ ostream << "*** END RECEIVED ASYNCHRONOUS COMMAND ***";
+
+ LOGCMS_INFO( logger, ostream.str() );
+
+ // Delegate to the base class.
+ TransportFilter::onCommand( command );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void LoggingTransport::oneway( Command* command )
+ throw(CommandIOException, exceptions::UnsupportedOperationException) {
+
+ try {
+
+ ostringstream ostream;
+ ostream << "*** BEGIN SENDING ONEWAY COMMAND ***" << endl;
+ ostream << command->toString() << endl;
+ ostream << "*** END SENDING ONEWAY COMMAND ***";
+
+ LOGCMS_INFO( logger, ostream.str() );
+
+ // Delegate to the base class.
+ TransportFilter::oneway( command );
+ }
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* LoggingTransport::request( Command* command )
+ throw(CommandIOException, exceptions::UnsupportedOperationException) {
+
+ try {
+
+ // Delegate to the base class.
+ Response* response = TransportFilter::request( command );
+
+ ostringstream ostream;
+ ostream << "*** SENDING REQUEST COMMAND ***" << endl;
+ ostream << command->toString() << endl;
+ ostream << "*** RECEIVED RESPONSE COMMAND ***" << endl;
+ ostream << ( response == NULL? "NULL" : response->toString() );
+
+ LOGCMS_INFO( logger, ostream.str() );
+
+ return response;
+ }
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+
+
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransport.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORT_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/logger/LoggerDefines.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * A transport filter that logs commands as they are sent/received.
+ */
+ class LoggingTransport : public TransportFilter
+ {
+ private:
+
+ LOGCMS_DECLARE( logger )
+
+ public:
+
+ /**
+ * Constructor.
+ * @param next - the next Transport in the chain
+ * @param own - true if this filter owns the next and should delete it
+ */
+ LoggingTransport( Transport* next, bool own = true );
+
+ virtual ~LoggingTransport() {}
+
+ /**
+ * Event handler for the receipt of a command.
+ * @param command - the received command object.
+ */
+ virtual void onCommand( Command* command );
+
+ /**
+ * Sends a one-way command. Does not wait for any response from the
+ * broker.
+ * @param command the command to be sent.
+ * @throws CommandIOException if an exception occurs during writing of
+ * the command.
+ * @throws UnsupportedOperationException if this method is not implemented
+ * by this transport.
+ */
+ virtual void oneway( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException);
+
+ /**
+ * Not supported by this class - throws an exception.
+ * @param command the command that is sent as a request
+ * @throws UnsupportedOperationException.
+ */
+ virtual Response* request( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException);
+
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORT_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LoggingTransportFactory.h"
+
+#include <activemq/transport/filters/LoggingTransport.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& LoggingTransportFactory::getInstance(void)
+{
+ // Create the one and only instance of the registrar
+ static TransportFactoryMapRegistrar registrar(
+ "transport.filters.LoggingTransport",
+ new LoggingTransportFactory() );
+
+ return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* LoggingTransportFactory::createTransport(
+ const activemq::util::Properties& properties AMQCPP_UNUSED,
+ Transport* next,
+ bool own ) throw ( ActiveMQException ) {
+
+ try {
+ return new LoggingTransport( next, own );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/LoggingTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORTFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORTFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * Factory Responsible for creating the LoggingTransport.
+ */
+ class LoggingTransportFactory : public TransportFactory {
+
+ public:
+
+ virtual ~LoggingTransportFactory() {}
+
+ /**
+ * Creates a Transport instance.
+ * @param properties - Object that will hold transport config values
+ * @param next - the next transport in the chain, or NULL
+ * @param own - does the new Transport own the next
+ * @throws ActiveMQException if an error occurs.
+ */
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next,
+ bool own ) throw ( exceptions::ActiveMQException );
+
+ /**
+ * Returns a reference to this TransportFactory
+ * @returns TransportFactory Reference
+ */
+ static TransportFactory& getInstance();
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_LOGGINGTRANSPORTFACTORY_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ResponseCorrelator.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ResponseCorrelator::getNextCommandId()
+ throw ( exceptions::ActiveMQException ){
+
+ try{
+
+ synchronized( &commandIdMutex ){
+ return ++nextCommandId;
+ }
+
+ // Should never get here, but some compilers aren't
+ // smart enough to figure out we'll never get here.
+ return 0;
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCorrelator::ResponseCorrelator( Transport* next, bool own )
+:
+ TransportFilter( next, own )
+{
+ nextCommandId = 0;
+
+ // Default max response wait time to 3 seconds.
+ maxResponseWaitTime = 3000;
+
+ // Start in the closed state.
+ closed = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCorrelator::~ResponseCorrelator(){
+
+ // Close the transport and destroy it.
+ close();
+
+ // Don't do anything with the future responses -
+ // they should be cleaned up by each requester.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned long ResponseCorrelator::getMaxResponseWaitTime() const{
+ return maxResponseWaitTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::setMaxResponseWaitTime( const unsigned long milliseconds ){
+ maxResponseWaitTime = milliseconds;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::oneway( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+ try{
+ command->setCommandId( getNextCommandId() );
+ command->setResponseRequired( false );
+
+ if( closed || next == NULL ){
+ throw CommandIOException( __FILE__, __LINE__,
+ "transport already closed" );
+ }
+
+ next->oneway( command );
+ }
+ AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* ResponseCorrelator::request( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+ try{
+ command->setCommandId( getNextCommandId() );
+ command->setResponseRequired( true );
+
+ // Add a future response object to the map indexed by this
+ // command id.
+ FutureResponse* futureResponse =
+ new FutureResponse();
+
+ synchronized( &mapMutex ){
+ requestMap[command->getCommandId()] = futureResponse;
+ }
+
+ // Wait to be notified of the response via the futureResponse
+ // object.
+ Response* response = NULL;
+ synchronized( futureResponse ){
+
+ // Send the request.
+ next->oneway( command );
+
+ // Wait for the response to come in.
+ futureResponse->wait( maxResponseWaitTime );
+
+ // Get the response.
+ response = futureResponse->getResponse();
+ }
+
+ // Perform cleanup on the map.
+ synchronized( &mapMutex ){
+
+ // We've done our waiting - get this thing out
+ // of the map.
+ requestMap.erase( command->getCommandId() );
+
+ // Destroy the futureResponse. It is safe to
+ // do this now because the other thread only
+ // accesses the futureResponse within a lock on
+ // the map.
+ delete futureResponse;
+ futureResponse = NULL;
+ }
+
+ if( response == NULL ){
+
+ throw CommandIOException( __FILE__, __LINE__,
+ "response from futureResponse was invalid" );
+ }
+
+ return response;
+ }
+ AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::onCommand( Command* command ) {
+
+ // Let's see if the incoming command is a response.
+ Response* response =
+ dynamic_cast<Response*>( command );
+
+ if( response == NULL ){
+
+ // It's a non-response - just notify the listener.
+ fire( command );
+ return;
+ }
+
+ // It is a response - let's correlate ...
+ synchronized( &mapMutex ){
+
+ // Look the future request up based on the correlation id.
+ std::map<unsigned int, FutureResponse*>::iterator iter =
+ requestMap.find( response->getCorrelationId() );
+ if( iter == requestMap.end() ){
+
+ // This is not terrible - just log it.
+ printf("ResponseCorrelator::onCommand() - received unknown response for request: %d\n",
+ response->getCorrelationId() );
+ return;
+ }
+
+ // Get the future response (if it's in the map, it's not NULL).
+ FutureResponse* futureResponse = NULL;
+ futureResponse = iter->second;
+
+ synchronized( futureResponse ){
+
+ // Set the response property in the future response.
+ futureResponse->setResponse( response );
+
+ // Notify all waiting for this response.
+ futureResponse->notifyAll();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::start() throw( cms::CMSException ) {
+
+ /**
+ * We're already started.
+ */
+ if( !closed ){
+ return;
+ }
+
+ if( commandlistener == NULL ){
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "commandListener is invalid" );
+ }
+
+ if( exceptionListener == NULL ){
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "exceptionListener is invalid" );
+ }
+
+ if( next == NULL ){
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "next transport is NULL" );
+ }
+
+ // Start the delegate transport object.
+ next->start();
+
+ // Mark it as open.
+ closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::close() throw( cms::CMSException ){
+
+ if( !closed && next != NULL ){
+ next->close();
+ }
+
+ closed = true;
+}
+
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelator.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATOR_H_
+#define ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATOR_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/transport/filters/FutureResponse.h>
+#include <activemq/transport/Command.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <map>
+#include <stdio.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * This type of transport filter is responsible for correlating
+ * asynchronous responses with requests. Non-response messages
+ * are simply sent directly to the CommandListener. It owns
+ * the transport that it
+ */
+ class ResponseCorrelator : public TransportFilter
+ {
+ private:
+
+ /**
+ * The next command id for sent commands.
+ */
+ unsigned int nextCommandId;
+
+ /**
+ * Map of request ids to future response objects.
+ */
+ std::map<unsigned int, FutureResponse*> requestMap;
+
+ /**
+ * Maximum amount of time in milliseconds to wait for a response.
+ */
+ unsigned long maxResponseWaitTime;
+
+ /**
+ * Sync object for accessing the next command id variable.
+ */
+ concurrent::Mutex commandIdMutex;
+
+ /**
+ * Sync object for accessing the request map.
+ */
+ concurrent::Mutex mapMutex;
+
+ /**
+ * Flag to indicate the closed state.
+ */
+ bool closed;
+
+ private:
+
+ /**
+ * Returns the next available command id.
+ */
+ unsigned int getNextCommandId() throw ( exceptions::ActiveMQException );
+
+ public:
+
+ /**
+ * Constructor.
+ * @param next the next transport in the chain
+ * @param own indicates if this transport owns the next
+ */
+ ResponseCorrelator( Transport* next, bool own = true );
+
+ virtual ~ResponseCorrelator();
+
+ /**
+ * Gets the maximum wait time for a response in milliseconds.
+ * @return max time that a response can take
+ */
+ virtual unsigned long getMaxResponseWaitTime() const;
+
+ /**
+ * Sets the maximum wait time for a response in milliseconds.
+ * @param milliseconds the max time that a response can take.
+ */
+ virtual void setMaxResponseWaitTime( const unsigned long milliseconds );
+
+ /**
+ * Sends a one-way command. Does not wait for any response from the
+ * broker.
+ * @param command the command to be sent.
+ * @throws CommandIOException if an exception occurs during writing of
+ * the command.
+ * @throws UnsupportedOperationException if this method is not implemented
+ * by this transport.
+ */
+ virtual void oneway( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException );
+
+ /**
+ * Sends the given request to the server and waits for the response.
+ * @param command The request to send.
+ * @return the response from the server.
+ * @throws CommandIOException if an error occurs with the request.
+ */
+ virtual Response* request( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException );
+
+ /**
+ * This is called in the context of the nested transport's
+ * reading thread. In the case of a response object,
+ * updates the request map and notifies those waiting on the
+ * response. Non-response messages are just delegated to
+ * the command listener.
+ * @param command the received from the nested transport.
+ */
+ virtual void onCommand( Command* command );
+
+ /**
+ * Starts this transport object and creates the thread for
+ * polling on the input stream for commands. If this object
+ * has been closed, throws an exception. Before calling start,
+ * the caller must set the IO streams and the reader and writer
+ * objects.
+ * @throws CMSException if an error occurs or if this transport
+ * has already been closed.
+ */
+ virtual void start() throw( cms::CMSException );
+
+ /**
+ * Stops the polling thread and closes the streams. This can
+ * be called explicitly, but is also called in the destructor. Once
+ * this object has been closed, it cannot be restarted.
+ * @throws CMSException if errors occur.
+ */
+ virtual void close() throw( cms::CMSException );
+
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATOR_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ResponseCorrelatorFactory.h"
+
+#include <activemq/transport/filters/ResponseCorrelator.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& ResponseCorrelatorFactory::getInstance(void)
+{
+ // Create the one and only instance of the registrar
+ static TransportFactoryMapRegistrar registrar(
+ "transport.filters.ResponseCorrelator",
+ new ResponseCorrelatorFactory() );
+
+ return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* ResponseCorrelatorFactory::createTransport(
+ const activemq::util::Properties& properties AMQCPP_UNUSED,
+ Transport* next,
+ bool own ) throw ( ActiveMQException ) {
+
+ try {
+ return new ResponseCorrelator( next, own );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/ResponseCorrelatorFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * Factory Responsible for creating the ResponseCorrelator.
+ */
+ class ResponseCorrelatorFactory : public TransportFactory {
+
+ public:
+
+ virtual ~ResponseCorrelatorFactory() {}
+
+ /**
+ * Creates a Transport instance.
+ * @param properties - Object that will hold transport config values
+ * @param next - the next transport in the chain, or NULL
+ * @param own - does the new Transport own the next
+ * @throws ActiveMQException if an error occurs.
+ */
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next,
+ bool own ) throw ( exceptions::ActiveMQException );
+
+ /**
+ * Returns a reference to this TransportFactory
+ * @returns TransportFactory Reference
+ */
+ static TransportFactory& getInstance();
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_RESPONSECORRELATORFACTORY_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TcpTransport.h"
+
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/IOTransport.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMap.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::io;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::network;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TcpTransport::TcpTransport( const activemq::util::Properties& properties,
+ Transport* next,
+ const bool own )
+:
+ TransportFilter( next, own ),
+ socket( NULL ),
+ loggingInputStream( NULL ),
+ loggingOutputStream( NULL ),
+ bufferedInputStream( NULL ),
+ bufferedOutputStream( NULL )
+{
+ try
+ {
+ if( !properties.hasProperty( "transport.uri" ) ) {
+ throw ActiveMQException(
+ __FILE__, __LINE__,
+ "TcpTransport::TcpTransport - "
+ "No URI set for this transport to connect to.");
+ }
+
+ // Create the IO device we will be communicating over the
+ // wire with. This may need to change if we add more types
+ // of sockets, such as SSL.
+ socket = SocketFactory::createSocket(
+ properties.getProperty( "transport.uri" ), properties );
+
+ // Cast it to an IO transport so we can wire up the socket
+ // input and output streams.
+ IOTransport* ioTransport = dynamic_cast<IOTransport*>( next );
+ if( ioTransport == NULL ){
+ throw ActiveMQException(
+ __FILE__, __LINE__,
+ "TcpTransport::TcpTransport - "
+ "transport must be of type IOTransport");
+ }
+
+ InputStream* inputStream = socket->getInputStream();
+ OutputStream* outputStream = socket->getOutputStream();
+
+ // If tcp tracing was enabled, wrap the iostreams with logging streams
+ if( properties.getProperty( "transport.tcpTracingEnabled", "false" ) == "true" ) {
+ loggingInputStream = new LoggingInputStream( inputStream );
+ loggingOutputStream = new LoggingOutputStream( outputStream );
+
+ inputStream = loggingInputStream;
+ outputStream = loggingOutputStream;
+ }
+
+ // Now wrap the input/output streams with buffered streams
+ bufferedInputStream = new BufferedInputStream(inputStream);
+ bufferedOutputStream = new BufferedOutputStream(outputStream);
+
+ // Give the IOTransport the streams.
+ ioTransport->setInputStream( bufferedInputStream );
+ ioTransport->setOutputStream( bufferedOutputStream );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TcpTransport::~TcpTransport()
+{
+ try
+ {
+ try{
+ close();
+ } catch( cms::CMSException& ex ){ /* Absorb */ }
+
+ if( socket != NULL ) {
+ delete socket;
+ socket = NULL;
+ }
+
+ if( loggingInputStream != NULL ) {
+ delete loggingInputStream;
+ loggingInputStream = NULL;
+ }
+
+ if( loggingOutputStream != NULL ) {
+ delete loggingOutputStream;
+ loggingOutputStream = NULL;
+ }
+
+ if( bufferedInputStream != NULL ) {
+ delete bufferedInputStream;
+ bufferedInputStream = NULL;
+ }
+
+ if( bufferedOutputStream != NULL ) {
+ delete bufferedOutputStream;
+ bufferedOutputStream = NULL;
+ }
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::close() throw( cms::CMSException ) {
+
+ try
+ {
+ // Invoke the paren't close first.
+ TransportFilter::close();
+
+ // Close the socket.
+ if( socket != NULL ) {
+ socket->close();
+ }
+ }
+ AMQ_CATCH_RETHROW( SocketException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, SocketException )
+ AMQ_CATCHALL_THROW( SocketException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORT_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORT_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/network/Socket.h>
+#include <activemq/util/Properties.h>
+#include <activemq/io/LoggingInputStream.h>
+#include <activemq/io/LoggingOutputStream.h>
+#include <activemq/io/BufferedInputStream.h>
+#include <activemq/io/BufferedOutputStream.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * Implements a TCP/IP based transport filter, this transport
+ * is meant to wrap an instance of an IOTransport. The lower
+ * level transport should take care of manaing stream reads
+ * and writes.
+ */
+ class TcpTransport : public TransportFilter
+ {
+ private:
+
+ /**
+ * Socket that this Transport Communicates with
+ */
+ network::Socket* socket;
+
+ io::LoggingInputStream* loggingInputStream;
+ io::LoggingOutputStream* loggingOutputStream;
+
+ io::BufferedInputStream* bufferedInputStream;
+ io::BufferedOutputStream* bufferedOutputStream;
+
+ public:
+
+ /**
+ * Constructor
+ * @param properties the configuration properties for this transport
+ * @param next the next transport in the chain
+ * @param own indicates if this transport owns the next.
+ */
+ TcpTransport( const activemq::util::Properties& properties,
+ Transport* next,
+ const bool own = true );
+
+ virtual ~TcpTransport();
+
+ /**
+ * Delegates to the superclass and then closes the socket.
+ * @throws CMSException if errors occur.
+ */
+ virtual void close() throw( cms::CMSException );
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORT_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.cpp Thu Apr 12 13:29:39 2007
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TcpTransportFactory.h"
+
+#include <activemq/transport/filters/TcpTransport.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::filters;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& TcpTransportFactory::getInstance(void)
+{
+ // Create the one and only instance of the registrar
+ static TransportFactoryMapRegistrar registrar(
+ "tcp", new TcpTransportFactory() );
+
+ return registrar.getFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* TcpTransportFactory::createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next,
+ bool own ) throw ( ActiveMQException ) {
+
+ try {
+ return new TcpTransport( properties, next, own );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h?view=auto&rev=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransportFactory.h Thu Apr 12 13:29:39 2007
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORTFACTORY_H_
+#define _ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORTFACTORY_H_
+
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportFactoryMapRegistrar.h>
+#include <activemq/transport/IOTransportFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+namespace filters{
+
+ /**
+ * Factory Responsible for creating the TcpTransport.
+ */
+ class TcpTransportFactory : public TransportFactory
+ {
+ public:
+
+ virtual ~TcpTransportFactory() {}
+
+ /**
+ * Creates a Transport instance.
+ * @param properties - Object that will hold transport config values
+ * @param next - the next transport in the chain, or NULL
+ * @param own - does the new Transport own the next
+ * @throws ActiveMQException if an error occurs.
+ */
+ virtual Transport* createTransport(
+ const activemq::util::Properties& properties,
+ Transport* next,
+ bool own ) throw ( exceptions::ActiveMQException );
+
+ /**
+ * Returns a reference to this TransportFactory
+ * @returns TransportFactory Reference
+ */
+ static TransportFactory& getInstance();
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FILTERS_TCPTRANSPORTFACTORY_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "OpenwireAsyncSenderTest.h"
#include <integration/IntegrationCommon.h>
@@ -74,7 +74,7 @@
////////////////////////////////////////////////////////////////////////////////
OpenwireAsyncSenderTest::OpenwireAsyncSenderTest()
:
- testSupport("tcp://localhost:61616?wireFormat=openwire&useAsyncSend=true")
+ testSupport("tcp://localhost:61616?wireFormat=openwire&transport.useAsyncSend=true")
{
testSupport.initialize();
}
@@ -93,42 +93,42 @@
cout << "Starting activemqcms test (sending "
<< IntegrationCommon::defaultMsgCount
<< " messages per type and sleeping "
- << IntegrationCommon::defaultDelay
+ << IntegrationCommon::defaultDelay
<< " milli-seconds) ...\n"
<< endl;
}
-
+
// Create CMS Object for Comms
cms::Session* session = testSupport.getSession();
cms::Topic* topic = testSupport.getSession()->createTopic("mytopic");
- cms::MessageConsumer* consumer =
- session->createConsumer( topic );
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
consumer->setMessageListener( &testSupport );
- cms::MessageProducer* producer =
+ cms::MessageProducer* producer =
session->createProducer( topic );
// Send some text messages
- testSupport.produceTextMessages(
+ testSupport.produceTextMessages(
*producer, IntegrationCommon::defaultMsgCount );
-
+
// Send some bytes messages.
- testSupport.produceTextMessages(
+ testSupport.produceTextMessages(
*producer, IntegrationCommon::defaultMsgCount );
// Wait for the messages to get here
testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
-
+
unsigned int numReceived = testSupport.getNumReceived();
if( IntegrationCommon::debug ) {
printf("received: %d\n", numReceived );
}
- CPPUNIT_ASSERT(
+ CPPUNIT_ASSERT(
numReceived == IntegrationCommon::defaultMsgCount * 2 );
if( IntegrationCommon::debug ) {
printf("Shutting Down\n" );
}
- delete producer;
+ delete producer;
delete consumer;
delete topic;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp?view=diff&rev=528222&r1=528221&r2=528222
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp Thu Apr 12 13:29:39 2007
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "AsyncSenderTest.h"
#include <integration/IntegrationCommon.h>
@@ -74,7 +74,7 @@
////////////////////////////////////////////////////////////////////////////////
AsyncSenderTest::AsyncSenderTest()
:
- testSupport("tcp://localhost:61613?wireFormat=stomp&useAsyncSend=true")
+ testSupport("tcp://localhost:61613?wireFormat=stomp&transport.useAsyncSend=true")
{
testSupport.initialize();
}
@@ -93,42 +93,42 @@
cout << "Starting activemqcms test (sending "
<< IntegrationCommon::defaultMsgCount
<< " messages per type and sleeping "
- << IntegrationCommon::defaultDelay
+ << IntegrationCommon::defaultDelay
<< " milli-seconds) ...\n"
<< endl;
}
-
+
// Create CMS Object for Comms
cms::Session* session = testSupport.getSession();
cms::Topic* topic = testSupport.getSession()->createTopic("mytopic");
- cms::MessageConsumer* consumer =
- session->createConsumer( topic );
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
consumer->setMessageListener( &testSupport );
- cms::MessageProducer* producer =
+ cms::MessageProducer* producer =
session->createProducer( topic );
// Send some text messages
- testSupport.produceTextMessages(
+ testSupport.produceTextMessages(
*producer, IntegrationCommon::defaultMsgCount );
-
+
// Send some bytes messages.
- testSupport.produceTextMessages(
+ testSupport.produceTextMessages(
*producer, IntegrationCommon::defaultMsgCount );
// Wait for the messages to get here
testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
-
+
unsigned int numReceived = testSupport.getNumReceived();
if( IntegrationCommon::debug ) {
printf("received: %d\n", numReceived );
}
- CPPUNIT_ASSERT(
+ CPPUNIT_ASSERT(
numReceived == IntegrationCommon::defaultMsgCount * 2 );
if( IntegrationCommon::debug ) {
printf("Shutting Down\n" );
}
- delete producer;
+ delete producer;
delete consumer;
delete topic;
}