You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/03 00:41:42 UTC

svn commit: r1393223 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/transport/correlator/ResponseCorrelator.cpp main/activemq/transport/correlator/ResponseCorrelator.h test/activemq/transport/correlator/ResponseCorrelatorTest.cpp

Author: tabish
Date: Tue Oct  2 22:41:41 2012
New Revision: 1393223

URL: http://svn.apache.org/viewvc?rev=1393223&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-429


Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=1393223&r1=1393222&r2=1393223&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Tue Oct  2 22:41:41 2012
@@ -18,6 +18,15 @@
 #include "ResponseCorrelator.h"
 #include <algorithm>
 
+#include <decaf/util/ArrayList.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <map>
+
+#include <activemq/commands/Response.h>
+#include <activemq/commands/ExceptionResponse.h>
+#include <activemq/transport/correlator/FutureResponse.h>
+
 using namespace std;
 using namespace activemq;
 using namespace activemq::transport;
@@ -27,15 +36,17 @@ using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
+using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
+////////////////////////////////////////////////////////////////////////////////
 namespace {
 
     class ResponseFinalizer {
     private:
 
-        ResponseFinalizer( const ResponseFinalizer& );
-        ResponseFinalizer operator= ( const ResponseFinalizer& );
+        ResponseFinalizer(const ResponseFinalizer&);
+        ResponseFinalizer operator=(const ResponseFinalizer&);
 
     private:
 
@@ -45,43 +56,79 @@ namespace {
 
     public:
 
-        ResponseFinalizer( Mutex* mutex, int commandId, std::map<unsigned int, Pointer<FutureResponse> >* map ) :
-            mutex( mutex ), commandId( commandId ), map( map ) {
+        ResponseFinalizer(Mutex* mutex, int commandId, std::map<unsigned int, Pointer<FutureResponse> >* map) :
+            mutex(mutex), commandId(commandId), map(map) {
         }
 
         ~ResponseFinalizer() {
-            synchronized( mutex ){
-                map->erase( commandId );
+            synchronized(mutex){
+                map->erase(commandId);
             }
         }
     };
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ResponseCorrelator::ResponseCorrelator( const Pointer<Transport>& next ) :
-    TransportFilter( next ), nextCommandId(1), requestMap(), mapMutex(), closed( true ) {
+namespace activemq{
+namespace transport{
+namespace correlator{
+
+    class CorrelatorData {
+    public:
+
+        // The next command id for sent commands.
+        decaf::util::concurrent::atomic::AtomicInteger nextCommandId;
+
+        // Map of request ids to future response objects.
+        std::map<unsigned int, Pointer<FutureResponse> > requestMap;
+
+        // Sync object for accessing the request map.
+        decaf::util::concurrent::Mutex mapMutex;
+
+        // Flag to indicate the closed state.
+        bool closed;
+
+        // Indicates that an the filter is now unusable from some error.
+        Pointer<Exception> priorError;
+
+    public:
+
+        CorrelatorData() : nextCommandId(1), requestMap(), mapMutex(), closed(true), priorError(NULL) {
+        }
+
+    };
+
+}}}
+
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCorrelator::ResponseCorrelator(const Pointer<Transport>& next) : TransportFilter(next), impl(new CorrelatorData) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ResponseCorrelator::~ResponseCorrelator(){
 
     // Close the transport and destroy it.
-    close();
+    try {
+        close();
+    }
+    AMQ_CATCHALL_NOTHROW()
+
+    delete this->impl;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::oneway( const Pointer<Command>& command ) {
+void ResponseCorrelator::oneway(const Pointer<Command>& command) {
 
-    try{
-        command->setCommandId( nextCommandId.getAndIncrement() );
-        command->setResponseRequired( false );
+    try {
+        command->setCommandId(this->impl->nextCommandId.getAndIncrement());
+        command->setResponseRequired(false);
 
-        if( closed || next == NULL ){
-            throw IOException( __FILE__, __LINE__,
-                "transport already closed" );
+        if (this->impl->closed || next == NULL) {
+            throw IOException(__FILE__, __LINE__, "transport already closed");
         }
 
-        next->oneway( command );
+        next->oneway(command);
     }
     AMQ_CATCH_RETHROW( UnsupportedOperationException )
     AMQ_CATCH_RETHROW( IOException )
@@ -91,39 +138,44 @@ void ResponseCorrelator::oneway( const P
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ResponseCorrelator::request( const Pointer<Command>& command ) {
+Pointer<Response> ResponseCorrelator::request(const Pointer<Command>& command) {
 
-    try{
+    try {
 
-        command->setCommandId( nextCommandId.getAndIncrement() );
-        command->setResponseRequired( true );
+        command->setCommandId(this->impl->nextCommandId.getAndIncrement());
+        command->setResponseRequired(true);
 
-        // Add a future response object to the map indexed by this
-        // command id.
-        Pointer<FutureResponse> futureResponse( new FutureResponse() );
+        // Add a future response object to the map indexed by this command id.
+        Pointer<FutureResponse> futureResponse(new FutureResponse());
+        Pointer<Exception> priorError;
+
+        synchronized(&this->impl->mapMutex) {
+            priorError = this->impl->priorError;
+            if (priorError == NULL) {
+                this->impl->requestMap.insert(
+                    make_pair((unsigned int) command->getCommandId(), futureResponse));
+            }
+        }
 
-        synchronized( &mapMutex ){
-            requestMap.insert( make_pair( (unsigned int)command->getCommandId(), futureResponse ) );
+        if (priorError != NULL) {
+            throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
         }
 
         // The finalizer will cleanup the map even if an exception is thrown.
-        ResponseFinalizer finalizer( &mapMutex, command->getCommandId(), &requestMap );
+        ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
 
-        // Wait to be notified of the response via the futureResponse
-        // object.
+        // Wait to be notified of the response via the futureResponse object.
         Pointer<commands::Response> response;
 
         // Send the request.
-        next->oneway( command );
+        next->oneway(command);
 
         // Get the response.
         response = futureResponse->getResponse();
 
-        if( response == NULL ){
-
-            throw IOException( __FILE__, __LINE__,
-                "No valid response received for command: %s, check broker.",
-                command->toString().c_str() );
+        if (response == NULL) {
+            throw IOException(__FILE__, __LINE__,
+                "No valid response received for command: %s, check broker.", command->toString().c_str());
         }
 
         return response;
@@ -136,38 +188,44 @@ Pointer<Response> ResponseCorrelator::re
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ResponseCorrelator::request( const Pointer<Command>& command, unsigned int timeout ) {
+Pointer<Response> ResponseCorrelator::request(const Pointer<Command>& command, unsigned int timeout) {
 
-    try{
-        command->setCommandId( nextCommandId.getAndIncrement() );
-        command->setResponseRequired( true );
+    try {
 
-        // Add a future response object to the map indexed by this
-        // command id.
-        Pointer<FutureResponse> futureResponse( new FutureResponse() );
+        command->setCommandId(this->impl->nextCommandId.getAndIncrement());
+        command->setResponseRequired(true);
 
-        synchronized( &mapMutex ){
-            requestMap.insert( make_pair( (unsigned int)command->getCommandId(), futureResponse ) );
+        // Add a future response object to the map indexed by this command id.
+        Pointer<FutureResponse> futureResponse(new FutureResponse());
+        Pointer<Exception> priorError;
+
+        synchronized(&this->impl->mapMutex) {
+            priorError = this->impl->priorError;
+            if (priorError == NULL) {
+                this->impl->requestMap.insert(
+                    make_pair((unsigned int) command->getCommandId(), futureResponse));
+            }
+        }
+
+        if (priorError != NULL) {
+            throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
         }
 
         // The finalizer will cleanup the map even if an exception is thrown.
-        ResponseFinalizer finalizer( &mapMutex, command->getCommandId(), &requestMap );
+        ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
 
-        // Wait to be notified of the response via the futureResponse
-        // object.
+        // Wait to be notified of the response via the futureResponse object.
         Pointer<commands::Response> response;
 
         // Send the request.
-        next->oneway( command );
+        next->oneway(command);
 
         // Get the response.
-        response = futureResponse->getResponse( timeout );
-
-        if( response == NULL ){
+        response = futureResponse->getResponse(timeout);
 
-            throw IOException( __FILE__, __LINE__,
-                "No valid response received for command: %s, check broker.",
-                command->toString().c_str() );
+        if (response == NULL) {
+            throw IOException(__FILE__, __LINE__,
+                "No valid response received for command: %s, check broker.", command->toString().c_str());
         }
 
         return response;
@@ -180,31 +238,24 @@ Pointer<Response> ResponseCorrelator::re
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::onCommand( const Pointer<Command>& command ) {
+void ResponseCorrelator::onCommand(const Pointer<Command>& command) {
 
-    // Let's see if the incoming command is a response.
-    if( !command->isResponse() ){
-
-        // It's a non-response - just notify the listener.
-        fire( command );
+    // Let's see if the incoming command is a response, if not we just pass it along
+    // and allow outstanding requests to keep waiting without stalling control commands.
+    if (!command->isResponse()) {
+        fire(command);
         return;
     }
 
-    Pointer<Response> response =
-        command.dynamicCast< Response >();
+    Pointer<Response> response = command.dynamicCast<Response>();
 
     // It is a response - let's correlate ...
-    synchronized( &mapMutex ){
+    synchronized(&this->impl->mapMutex) {
 
         // Look the future request up based on the correlation id.
-        std::map< unsigned int, Pointer<FutureResponse> >::iterator iter =
-            requestMap.find( response->getCorrelationId() );
-        if( iter == requestMap.end() ){
-
-            // This is not terrible - just log it.
-            //printf( "ResponseCorrelator::onCommand() - "
-            //        "received unknown response for request: %d\n",
-            //        response->getCorrelationId() );
+        std::map<unsigned int, Pointer<FutureResponse> >::iterator iter =
+            this->impl->requestMap.find(response->getCorrelationId());
+        if (iter == this->impl->requestMap.end()) {
             return;
         }
 
@@ -212,7 +263,7 @@ void ResponseCorrelator::onCommand( cons
         Pointer<FutureResponse> futureResponse = iter->second;
 
         // Set the response property in the future response.
-        futureResponse->setResponse( response );
+        futureResponse->setResponse(response);
     }
 }
 
@@ -221,26 +272,23 @@ void ResponseCorrelator::start() {
 
     try{
 
-        /**
-         * We're already started.
-         */
-        if( !closed ){
+        if (!this->impl->closed) {
             return;
         }
 
-        if( listener == NULL ){
-            throw IOException( __FILE__, __LINE__, "exceptionListener is invalid" );
+        if (listener == NULL) {
+            throw IOException(__FILE__, __LINE__, "exceptionListener is invalid");
         }
 
-        if( next == NULL ){
-            throw IOException( __FILE__, __LINE__, "next transport is NULL" );
+        if (next == NULL) {
+            throw IOException(__FILE__, __LINE__, "next transport is NULL");
         }
 
         // Start the delegate transport object.
         next->start();
 
         // Mark it as open.
-        closed = false;
+        this->impl->closed = false;
     }
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -250,21 +298,15 @@ void ResponseCorrelator::start() {
 ////////////////////////////////////////////////////////////////////////////////
 void ResponseCorrelator::close() {
 
-    try{
+    try {
 
-        // Wake-up any outstanding requests.
-        synchronized( &mapMutex ){
-            std::map<unsigned int, Pointer<FutureResponse> >::iterator iter = requestMap.begin();
-            for( ; iter != requestMap.end(); ++iter ){
-                iter->second->setResponse( Pointer<Response>() );
-            }
-        }
+        dispose(Pointer<Exception>(new IOException(__FILE__, __LINE__, "Transport Stopped")));
 
-        if( !closed && next != NULL ){
+        if (!this->impl->closed && next != NULL) {
             next->close();
         }
 
-        closed = true;
+        this->impl->closed = true;
     }
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -272,17 +314,32 @@ void ResponseCorrelator::close() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::onTransportException( Transport* source AMQCPP_UNUSED,
-                                               const decaf::lang::Exception& ex ) {
+void ResponseCorrelator::onException(const decaf::lang::Exception& ex) {
+    dispose(Pointer<Exception>(ex.clone()));
+    TransportFilter::onException(ex);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::dispose(const Pointer<Exception> error) {
 
-    // Trigger each outstanding request to complete so that we don't hang
-    // forever waiting for one that has been sent without timeout.
-    synchronized( &mapMutex ){
-        std::map< unsigned int, Pointer<FutureResponse> >::iterator iter = requestMap.begin();
-        for( ; iter != requestMap.end(); ++iter ){
-            iter->second->setResponse( Pointer<Response>() );
+    ArrayList<Pointer<FutureResponse> > requests;
+    synchronized(&this->impl->mapMutex){
+        if (this->impl->priorError == NULL) {
+            this->impl->priorError = error;
+            requests.ensureCapacity((int)this->impl->requestMap.size());
+            std::map<unsigned int, Pointer<FutureResponse> >::iterator iter = this->impl->requestMap.begin();
+            for (; iter != this->impl->requestMap.end(); ++iter) {
+                requests.add(iter->second);
+            }
+            this->impl->requestMap.clear();
         }
     }
 
-    fire( ex );
+    if (!requests.isEmpty()) {
+        Pointer<Iterator<Pointer<FutureResponse> > > iter(requests.iterator());
+        while (iter->hasNext()) {
+            Pointer<FutureResponse> response = iter->next();
+            response->setResponse(Pointer<commands::Response>(new commands::ExceptionResponse));
+        }
+    }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h?rev=1393223&r1=1393222&r2=1393223&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h Tue Oct  2 22:41:41 2012
@@ -20,14 +20,11 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/transport/TransportFilter.h>
-#include <activemq/transport/correlator/FutureResponse.h>
 #include <activemq/commands/Command.h>
 #include <activemq/commands/Response.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/Concurrent.h>
-#include <decaf/util/concurrent/atomic/AtomicInteger.h>
-#include <map>
-#include <stdio.h>
+
+#include <decaf/lang/Exception.h>
+#include <decaf/lang/Pointer.h>
 
 namespace activemq{
 namespace transport{
@@ -37,68 +34,54 @@ namespace correlator{
     using activemq::commands::Command;
     using activemq::commands::Response;
 
+    class CorrelatorData;
+
     /**
-     * This type of transport filter is responsible for correlating
-     * asynchronous responses with requests.  Non-response messages
-     * are simply sent directly to the CommandListener.  It owns
-     * the transport that it
+     * This type of transport filter is responsible for correlating asynchronous responses
+     * with requests.  Non-response messages are simply sent directly to the CommandListener.
+     * It owns the transport that it
      */
     class AMQCPP_API ResponseCorrelator : public TransportFilter {
     private:
 
-        /**
-         * The next command id for sent commands.
-         */
-        decaf::util::concurrent::atomic::AtomicInteger nextCommandId;
-
-        /**
-         * Map of request ids to future response objects.
-         */
-        std::map<unsigned int, Pointer<FutureResponse> > requestMap;
-
-        /**
-         * Sync object for accessing the request map.
-         */
-        decaf::util::concurrent::Mutex mapMutex;
-
-        /**
-         * Flag to indicate the closed state.
-         */
-        bool closed;
+        CorrelatorData* impl;
 
     public:
 
         /**
-         * Constructor.
-         * @param next the next transport in the chain
+         * Creates a new ResponseCorrelator transport filter that wraps the given transport.
+         *
+         * @param next
+         *      the next transport in the chain
+         *
+         * @throws NullPointerException if next if NULL.
          */
-        ResponseCorrelator( const Pointer<Transport>& next );
+        ResponseCorrelator(const Pointer<Transport>& next);
 
         virtual ~ResponseCorrelator();
 
     public:  // Transport Methods
 
-        virtual void oneway( const Pointer<Command>& command );
+        virtual void oneway(const Pointer<Command>& command);
 
-        virtual Pointer<Response> request( const Pointer<Command>& command );
+        virtual Pointer<Response> request(const Pointer<Command>& command);
 
-        virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
+        virtual Pointer<Response> request(const Pointer<Command>& command, unsigned int timeout);
 
         virtual void start();
 
         virtual void close();
 
         /**
-         * This is called in the context of the nested transport's
-         * reading thread.  In the case of a response object,
-         * updates the request map and notifies those waiting on the
-         * response.  Non-response messages are just delegated to
-         * the command listener.
+         * This is called in the context of the nested transport's reading thread.  In
+         * the case of a response object, updates the request map and notifies those
+         * waiting on the response.  Non-response messages are just delegated to the
+         * command listener.
          *
          * @param command
          *      The received from the nested transport.
          */
-        virtual void onCommand( const Pointer<Command>& command );
+        virtual void onCommand(const Pointer<Command>& command);
 
         /**
          * Event handler for an exception from a command transport.
@@ -108,7 +91,11 @@ namespace correlator{
          * @param ex
          *      The exception that was caught.
          */
-        virtual void onTransportException( Transport* source, const decaf::lang::Exception& ex );
+        virtual void onException(const decaf::lang::Exception& ex);
+
+    private:
+
+        void dispose(const Pointer<decaf::lang::Exception> ex);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=1393223&r1=1393222&r2=1393223&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Tue Oct  2 22:41:41 2012
@@ -406,44 +406,38 @@ void ResponseCorrelatorTest::testOneway(
 ////////////////////////////////////////////////////////////////////////////////
 void ResponseCorrelatorTest::testTransportException(){
 
-    try{
-
-        MyListener listener;
-        Pointer<MyBrokenTransport> transport( new MyBrokenTransport() );
-        ResponseCorrelator correlator( transport );
-        correlator.setTransportListener( &listener );
-        CPPUNIT_ASSERT( transport->listener == &correlator );
+    MyListener listener;
+    Pointer<MyBrokenTransport> transport( new MyBrokenTransport() );
+    ResponseCorrelator correlator( transport );
+    correlator.setTransportListener( &listener );
+    CPPUNIT_ASSERT( transport->listener == &correlator );
 
-        // Give the thread a little time to get up and running.
-        synchronized( &(transport->startedMutex) ) {
-            // Start the transport.
-            correlator.start();
+    // Give the thread a little time to get up and running.
+    synchronized( &(transport->startedMutex) ) {
+        // Start the transport.
+        correlator.start();
 
-            transport->startedMutex.wait();
-        }
+        transport->startedMutex.wait();
+    }
 
-        // Send one request.
-        Pointer<MyCommand> cmd( new MyCommand );
-        try{
-            correlator.request( cmd, 500 );
-            CPPUNIT_ASSERT(false);
-        }catch( IOException& ex ){
-            // Expected.
-        }
+    // Send one request.
+    Pointer<MyCommand> cmd( new MyCommand );
+    try{
+        correlator.request( cmd, 1000 );
+    }catch( IOException& ex ){
+        CPPUNIT_ASSERT(false);
+    }
 
-        // Wait to make sure we get the asynchronous message back.
-        decaf::lang::Thread::sleep( 200 );
+    // Wait to make sure we get the asynchronous message back.
+    decaf::lang::Thread::sleep( 200 );
 
-        // Since our transport relays our original command back at us as a
-        // non-response message, check to make sure we received it and that
-        // it is the original command.
-        CPPUNIT_ASSERT( listener.commands.size() == 0 );
-        CPPUNIT_ASSERT( listener.exCount == 1 );
+    // Since our transport relays our original command back at us as a
+    // non-response message, check to make sure we received it and that
+    // it is the original command.
+    CPPUNIT_ASSERT( listener.commands.size() == 0 );
+    CPPUNIT_ASSERT( listener.exCount == 1 );
 
-        correlator.close();
-    }
-    AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+    correlator.close();
 }
 
 ////////////////////////////////////////////////////////////////////////////////