You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/03 00:41:42 UTC
svn commit: r1393223 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/transport/correlator/ResponseCorrelator.cpp
main/activemq/transport/correlator/ResponseCorrelator.h
test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
Author: tabish
Date: Tue Oct 2 22:41:41 2012
New Revision: 1393223
URL: http://svn.apache.org/viewvc?rev=1393223&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-429
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=1393223&r1=1393222&r2=1393223&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Tue Oct 2 22:41:41 2012
@@ -18,6 +18,15 @@
#include "ResponseCorrelator.h"
#include <algorithm>
+#include <decaf/util/ArrayList.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <map>
+
+#include <activemq/commands/Response.h>
+#include <activemq/commands/ExceptionResponse.h>
+#include <activemq/transport/correlator/FutureResponse.h>
+
using namespace std;
using namespace activemq;
using namespace activemq::transport;
@@ -27,15 +36,17 @@ using namespace decaf;
using namespace decaf::io;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
+using namespace decaf::util;
using namespace decaf::util::concurrent;
+////////////////////////////////////////////////////////////////////////////////
namespace {
class ResponseFinalizer {
private:
- ResponseFinalizer( const ResponseFinalizer& );
- ResponseFinalizer operator= ( const ResponseFinalizer& );
+ ResponseFinalizer(const ResponseFinalizer&);
+ ResponseFinalizer operator=(const ResponseFinalizer&);
private:
@@ -45,43 +56,79 @@ namespace {
public:
- ResponseFinalizer( Mutex* mutex, int commandId, std::map<unsigned int, Pointer<FutureResponse> >* map ) :
- mutex( mutex ), commandId( commandId ), map( map ) {
+ ResponseFinalizer(Mutex* mutex, int commandId, std::map<unsigned int, Pointer<FutureResponse> >* map) :
+ mutex(mutex), commandId(commandId), map(map) {
}
~ResponseFinalizer() {
- synchronized( mutex ){
- map->erase( commandId );
+ synchronized(mutex){
+ map->erase(commandId);
}
}
};
}
////////////////////////////////////////////////////////////////////////////////
-ResponseCorrelator::ResponseCorrelator( const Pointer<Transport>& next ) :
- TransportFilter( next ), nextCommandId(1), requestMap(), mapMutex(), closed( true ) {
+namespace activemq{
+namespace transport{
+namespace correlator{
+
+ class CorrelatorData {
+ public:
+
+ // The next command id for sent commands.
+ decaf::util::concurrent::atomic::AtomicInteger nextCommandId;
+
+ // Map of request ids to future response objects.
+ std::map<unsigned int, Pointer<FutureResponse> > requestMap;
+
+ // Sync object for accessing the request map.
+ decaf::util::concurrent::Mutex mapMutex;
+
+ // Flag to indicate the closed state.
+ bool closed;
+
+ // Indicates that an the filter is now unusable from some error.
+ Pointer<Exception> priorError;
+
+ public:
+
+ CorrelatorData() : nextCommandId(1), requestMap(), mapMutex(), closed(true), priorError(NULL) {
+ }
+
+ };
+
+}}}
+
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCorrelator::ResponseCorrelator(const Pointer<Transport>& next) : TransportFilter(next), impl(new CorrelatorData) {
}
////////////////////////////////////////////////////////////////////////////////
ResponseCorrelator::~ResponseCorrelator(){
// Close the transport and destroy it.
- close();
+ try {
+ close();
+ }
+ AMQ_CATCHALL_NOTHROW()
+
+ delete this->impl;
}
////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::oneway( const Pointer<Command>& command ) {
+void ResponseCorrelator::oneway(const Pointer<Command>& command) {
- try{
- command->setCommandId( nextCommandId.getAndIncrement() );
- command->setResponseRequired( false );
+ try {
+ command->setCommandId(this->impl->nextCommandId.getAndIncrement());
+ command->setResponseRequired(false);
- if( closed || next == NULL ){
- throw IOException( __FILE__, __LINE__,
- "transport already closed" );
+ if (this->impl->closed || next == NULL) {
+ throw IOException(__FILE__, __LINE__, "transport already closed");
}
- next->oneway( command );
+ next->oneway(command);
}
AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCH_RETHROW( IOException )
@@ -91,39 +138,44 @@ void ResponseCorrelator::oneway( const P
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ResponseCorrelator::request( const Pointer<Command>& command ) {
+Pointer<Response> ResponseCorrelator::request(const Pointer<Command>& command) {
- try{
+ try {
- command->setCommandId( nextCommandId.getAndIncrement() );
- command->setResponseRequired( true );
+ command->setCommandId(this->impl->nextCommandId.getAndIncrement());
+ command->setResponseRequired(true);
- // Add a future response object to the map indexed by this
- // command id.
- Pointer<FutureResponse> futureResponse( new FutureResponse() );
+ // Add a future response object to the map indexed by this command id.
+ Pointer<FutureResponse> futureResponse(new FutureResponse());
+ Pointer<Exception> priorError;
+
+ synchronized(&this->impl->mapMutex) {
+ priorError = this->impl->priorError;
+ if (priorError == NULL) {
+ this->impl->requestMap.insert(
+ make_pair((unsigned int) command->getCommandId(), futureResponse));
+ }
+ }
- synchronized( &mapMutex ){
- requestMap.insert( make_pair( (unsigned int)command->getCommandId(), futureResponse ) );
+ if (priorError != NULL) {
+ throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
}
// The finalizer will cleanup the map even if an exception is thrown.
- ResponseFinalizer finalizer( &mapMutex, command->getCommandId(), &requestMap );
+ ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
- // Wait to be notified of the response via the futureResponse
- // object.
+ // Wait to be notified of the response via the futureResponse object.
Pointer<commands::Response> response;
// Send the request.
- next->oneway( command );
+ next->oneway(command);
// Get the response.
response = futureResponse->getResponse();
- if( response == NULL ){
-
- throw IOException( __FILE__, __LINE__,
- "No valid response received for command: %s, check broker.",
- command->toString().c_str() );
+ if (response == NULL) {
+ throw IOException(__FILE__, __LINE__,
+ "No valid response received for command: %s, check broker.", command->toString().c_str());
}
return response;
@@ -136,38 +188,44 @@ Pointer<Response> ResponseCorrelator::re
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ResponseCorrelator::request( const Pointer<Command>& command, unsigned int timeout ) {
+Pointer<Response> ResponseCorrelator::request(const Pointer<Command>& command, unsigned int timeout) {
- try{
- command->setCommandId( nextCommandId.getAndIncrement() );
- command->setResponseRequired( true );
+ try {
- // Add a future response object to the map indexed by this
- // command id.
- Pointer<FutureResponse> futureResponse( new FutureResponse() );
+ command->setCommandId(this->impl->nextCommandId.getAndIncrement());
+ command->setResponseRequired(true);
- synchronized( &mapMutex ){
- requestMap.insert( make_pair( (unsigned int)command->getCommandId(), futureResponse ) );
+ // Add a future response object to the map indexed by this command id.
+ Pointer<FutureResponse> futureResponse(new FutureResponse());
+ Pointer<Exception> priorError;
+
+ synchronized(&this->impl->mapMutex) {
+ priorError = this->impl->priorError;
+ if (priorError == NULL) {
+ this->impl->requestMap.insert(
+ make_pair((unsigned int) command->getCommandId(), futureResponse));
+ }
+ }
+
+ if (priorError != NULL) {
+ throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
}
// The finalizer will cleanup the map even if an exception is thrown.
- ResponseFinalizer finalizer( &mapMutex, command->getCommandId(), &requestMap );
+ ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
- // Wait to be notified of the response via the futureResponse
- // object.
+ // Wait to be notified of the response via the futureResponse object.
Pointer<commands::Response> response;
// Send the request.
- next->oneway( command );
+ next->oneway(command);
// Get the response.
- response = futureResponse->getResponse( timeout );
-
- if( response == NULL ){
+ response = futureResponse->getResponse(timeout);
- throw IOException( __FILE__, __LINE__,
- "No valid response received for command: %s, check broker.",
- command->toString().c_str() );
+ if (response == NULL) {
+ throw IOException(__FILE__, __LINE__,
+ "No valid response received for command: %s, check broker.", command->toString().c_str());
}
return response;
@@ -180,31 +238,24 @@ Pointer<Response> ResponseCorrelator::re
}
////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::onCommand( const Pointer<Command>& command ) {
+void ResponseCorrelator::onCommand(const Pointer<Command>& command) {
- // Let's see if the incoming command is a response.
- if( !command->isResponse() ){
-
- // It's a non-response - just notify the listener.
- fire( command );
+ // Let's see if the incoming command is a response, if not we just pass it along
+ // and allow outstanding requests to keep waiting without stalling control commands.
+ if (!command->isResponse()) {
+ fire(command);
return;
}
- Pointer<Response> response =
- command.dynamicCast< Response >();
+ Pointer<Response> response = command.dynamicCast<Response>();
// It is a response - let's correlate ...
- synchronized( &mapMutex ){
+ synchronized(&this->impl->mapMutex) {
// Look the future request up based on the correlation id.
- std::map< unsigned int, Pointer<FutureResponse> >::iterator iter =
- requestMap.find( response->getCorrelationId() );
- if( iter == requestMap.end() ){
-
- // This is not terrible - just log it.
- //printf( "ResponseCorrelator::onCommand() - "
- // "received unknown response for request: %d\n",
- // response->getCorrelationId() );
+ std::map<unsigned int, Pointer<FutureResponse> >::iterator iter =
+ this->impl->requestMap.find(response->getCorrelationId());
+ if (iter == this->impl->requestMap.end()) {
return;
}
@@ -212,7 +263,7 @@ void ResponseCorrelator::onCommand( cons
Pointer<FutureResponse> futureResponse = iter->second;
// Set the response property in the future response.
- futureResponse->setResponse( response );
+ futureResponse->setResponse(response);
}
}
@@ -221,26 +272,23 @@ void ResponseCorrelator::start() {
try{
- /**
- * We're already started.
- */
- if( !closed ){
+ if (!this->impl->closed) {
return;
}
- if( listener == NULL ){
- throw IOException( __FILE__, __LINE__, "exceptionListener is invalid" );
+ if (listener == NULL) {
+ throw IOException(__FILE__, __LINE__, "exceptionListener is invalid");
}
- if( next == NULL ){
- throw IOException( __FILE__, __LINE__, "next transport is NULL" );
+ if (next == NULL) {
+ throw IOException(__FILE__, __LINE__, "next transport is NULL");
}
// Start the delegate transport object.
next->start();
// Mark it as open.
- closed = false;
+ this->impl->closed = false;
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -250,21 +298,15 @@ void ResponseCorrelator::start() {
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelator::close() {
- try{
+ try {
- // Wake-up any outstanding requests.
- synchronized( &mapMutex ){
- std::map<unsigned int, Pointer<FutureResponse> >::iterator iter = requestMap.begin();
- for( ; iter != requestMap.end(); ++iter ){
- iter->second->setResponse( Pointer<Response>() );
- }
- }
+ dispose(Pointer<Exception>(new IOException(__FILE__, __LINE__, "Transport Stopped")));
- if( !closed && next != NULL ){
+ if (!this->impl->closed && next != NULL) {
next->close();
}
- closed = true;
+ this->impl->closed = true;
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -272,17 +314,32 @@ void ResponseCorrelator::close() {
}
////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::onTransportException( Transport* source AMQCPP_UNUSED,
- const decaf::lang::Exception& ex ) {
+void ResponseCorrelator::onException(const decaf::lang::Exception& ex) {
+ dispose(Pointer<Exception>(ex.clone()));
+ TransportFilter::onException(ex);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::dispose(const Pointer<Exception> error) {
- // Trigger each outstanding request to complete so that we don't hang
- // forever waiting for one that has been sent without timeout.
- synchronized( &mapMutex ){
- std::map< unsigned int, Pointer<FutureResponse> >::iterator iter = requestMap.begin();
- for( ; iter != requestMap.end(); ++iter ){
- iter->second->setResponse( Pointer<Response>() );
+ ArrayList<Pointer<FutureResponse> > requests;
+ synchronized(&this->impl->mapMutex){
+ if (this->impl->priorError == NULL) {
+ this->impl->priorError = error;
+ requests.ensureCapacity((int)this->impl->requestMap.size());
+ std::map<unsigned int, Pointer<FutureResponse> >::iterator iter = this->impl->requestMap.begin();
+ for (; iter != this->impl->requestMap.end(); ++iter) {
+ requests.add(iter->second);
+ }
+ this->impl->requestMap.clear();
}
}
- fire( ex );
+ if (!requests.isEmpty()) {
+ Pointer<Iterator<Pointer<FutureResponse> > > iter(requests.iterator());
+ while (iter->hasNext()) {
+ Pointer<FutureResponse> response = iter->next();
+ response->setResponse(Pointer<commands::Response>(new commands::ExceptionResponse));
+ }
+ }
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h?rev=1393223&r1=1393222&r2=1393223&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h Tue Oct 2 22:41:41 2012
@@ -20,14 +20,11 @@
#include <activemq/util/Config.h>
#include <activemq/transport/TransportFilter.h>
-#include <activemq/transport/correlator/FutureResponse.h>
#include <activemq/commands/Command.h>
#include <activemq/commands/Response.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/Concurrent.h>
-#include <decaf/util/concurrent/atomic/AtomicInteger.h>
-#include <map>
-#include <stdio.h>
+
+#include <decaf/lang/Exception.h>
+#include <decaf/lang/Pointer.h>
namespace activemq{
namespace transport{
@@ -37,68 +34,54 @@ namespace correlator{
using activemq::commands::Command;
using activemq::commands::Response;
+ class CorrelatorData;
+
/**
- * This type of transport filter is responsible for correlating
- * asynchronous responses with requests. Non-response messages
- * are simply sent directly to the CommandListener. It owns
- * the transport that it
+ * This type of transport filter is responsible for correlating asynchronous responses
+ * with requests. Non-response messages are simply sent directly to the CommandListener.
+ * It owns the transport that it
*/
class AMQCPP_API ResponseCorrelator : public TransportFilter {
private:
- /**
- * The next command id for sent commands.
- */
- decaf::util::concurrent::atomic::AtomicInteger nextCommandId;
-
- /**
- * Map of request ids to future response objects.
- */
- std::map<unsigned int, Pointer<FutureResponse> > requestMap;
-
- /**
- * Sync object for accessing the request map.
- */
- decaf::util::concurrent::Mutex mapMutex;
-
- /**
- * Flag to indicate the closed state.
- */
- bool closed;
+ CorrelatorData* impl;
public:
/**
- * Constructor.
- * @param next the next transport in the chain
+ * Creates a new ResponseCorrelator transport filter that wraps the given transport.
+ *
+ * @param next
+ * the next transport in the chain
+ *
+ * @throws NullPointerException if next if NULL.
*/
- ResponseCorrelator( const Pointer<Transport>& next );
+ ResponseCorrelator(const Pointer<Transport>& next);
virtual ~ResponseCorrelator();
public: // Transport Methods
- virtual void oneway( const Pointer<Command>& command );
+ virtual void oneway(const Pointer<Command>& command);
- virtual Pointer<Response> request( const Pointer<Command>& command );
+ virtual Pointer<Response> request(const Pointer<Command>& command);
- virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
+ virtual Pointer<Response> request(const Pointer<Command>& command, unsigned int timeout);
virtual void start();
virtual void close();
/**
- * This is called in the context of the nested transport's
- * reading thread. In the case of a response object,
- * updates the request map and notifies those waiting on the
- * response. Non-response messages are just delegated to
- * the command listener.
+ * This is called in the context of the nested transport's reading thread. In
+ * the case of a response object, updates the request map and notifies those
+ * waiting on the response. Non-response messages are just delegated to the
+ * command listener.
*
* @param command
* The received from the nested transport.
*/
- virtual void onCommand( const Pointer<Command>& command );
+ virtual void onCommand(const Pointer<Command>& command);
/**
* Event handler for an exception from a command transport.
@@ -108,7 +91,11 @@ namespace correlator{
* @param ex
* The exception that was caught.
*/
- virtual void onTransportException( Transport* source, const decaf::lang::Exception& ex );
+ virtual void onException(const decaf::lang::Exception& ex);
+
+ private:
+
+ void dispose(const Pointer<decaf::lang::Exception> ex);
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=1393223&r1=1393222&r2=1393223&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Tue Oct 2 22:41:41 2012
@@ -406,44 +406,38 @@ void ResponseCorrelatorTest::testOneway(
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelatorTest::testTransportException(){
- try{
-
- MyListener listener;
- Pointer<MyBrokenTransport> transport( new MyBrokenTransport() );
- ResponseCorrelator correlator( transport );
- correlator.setTransportListener( &listener );
- CPPUNIT_ASSERT( transport->listener == &correlator );
+ MyListener listener;
+ Pointer<MyBrokenTransport> transport( new MyBrokenTransport() );
+ ResponseCorrelator correlator( transport );
+ correlator.setTransportListener( &listener );
+ CPPUNIT_ASSERT( transport->listener == &correlator );
- // Give the thread a little time to get up and running.
- synchronized( &(transport->startedMutex) ) {
- // Start the transport.
- correlator.start();
+ // Give the thread a little time to get up and running.
+ synchronized( &(transport->startedMutex) ) {
+ // Start the transport.
+ correlator.start();
- transport->startedMutex.wait();
- }
+ transport->startedMutex.wait();
+ }
- // Send one request.
- Pointer<MyCommand> cmd( new MyCommand );
- try{
- correlator.request( cmd, 500 );
- CPPUNIT_ASSERT(false);
- }catch( IOException& ex ){
- // Expected.
- }
+ // Send one request.
+ Pointer<MyCommand> cmd( new MyCommand );
+ try{
+ correlator.request( cmd, 1000 );
+ }catch( IOException& ex ){
+ CPPUNIT_ASSERT(false);
+ }
- // Wait to make sure we get the asynchronous message back.
- decaf::lang::Thread::sleep( 200 );
+ // Wait to make sure we get the asynchronous message back.
+ decaf::lang::Thread::sleep( 200 );
- // Since our transport relays our original command back at us as a
- // non-response message, check to make sure we received it and that
- // it is the original command.
- CPPUNIT_ASSERT( listener.commands.size() == 0 );
- CPPUNIT_ASSERT( listener.exCount == 1 );
+ // Since our transport relays our original command back at us as a
+ // non-response message, check to make sure we received it and that
+ // it is the original command.
+ CPPUNIT_ASSERT( listener.commands.size() == 0 );
+ CPPUNIT_ASSERT( listener.exCount == 1 );
- correlator.close();
- }
- AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
- AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+ correlator.close();
}
////////////////////////////////////////////////////////////////////////////////