You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/08/16 21:21:46 UTC

svn commit: r686544 - in /activemq/activemq-cpp/trunk/src/main/activemq/transport/filters: FutureResponse.h ResponseCorrelator.cpp

Author: tabish
Date: Sat Aug 16 12:21:45 2008
New Revision: 686544

URL: http://svn.apache.org/viewvc?rev=686544&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-189

Adding timed and un-timed requests to the Response correlator code via the Future response object, refactored FutureReponse to use a Latch to wait for the response.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h?rev=686544&r1=686543&r2=686544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h Sat Aug 16 12:21:45 2008
@@ -20,7 +20,7 @@
 
 #include <activemq/util/Config.h>
 #include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
 #include <activemq/transport/Response.h>
 
 #include <activemq/exceptions/ActiveMQException.h>
@@ -30,92 +30,48 @@
 namespace filters{
 
     /**
-     * A container that holds a response object.  Since this
-     * object is Synchronizable, callers can wait on this object
-     * and when a response comes in, notify can be called to
-     * inform those waiting that the response is now available.
+     * A container that holds a response object.  Callers of the getResponse
+     * method will block until a response has been receive unless they call
+     * the getRepsonse that takes a timeout.
      */
-    class AMQCPP_API FutureResponse : public decaf::util::concurrent::Synchronizable{
+    class AMQCPP_API FutureResponse {
     private:
 
+        mutable decaf::util::concurrent::CountDownLatch responseLatch;
         Response* response;
-        decaf::util::concurrent::Mutex mutex;
 
     public:
 
-        FutureResponse(){
+        FutureResponse() : responseLatch( 1 ) {
             response = NULL;
         }
 
         virtual ~FutureResponse(){}
 
         /**
-         * Locks the object.
-         * @throws ActiveMQException
-         */
-        virtual void lock() throw( exceptions::ActiveMQException ){
-            mutex.lock();
-        }
-
-        /**
-         * Unlocks the object.
-         * @throws ActiveMQException
-         */
-        virtual void unlock() throw( exceptions::ActiveMQException ){
-            mutex.unlock();
-        }
-
-        /**
-         * Waits on a signal from this object, which is generated
-         * by a call to Notify.  Must have this object locked before
-         * calling.
-         * @throws ActiveMQException
-         */
-        virtual void wait() throw( exceptions::ActiveMQException ){
-            mutex.wait();
-        }
-
-        /**
-         * Waits on a signal from this object, which is generated
-         * by a call to Notify.  Must have this object locked before
-         * calling.  This wait will timeout after the specified time
-         * interval.
-         * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
-         * @throws ActiveMQException
-         */
-        virtual void wait( unsigned long millisecs )
-            throw( exceptions::ActiveMQException ) {
-            mutex.wait( millisecs );
-        }
-
-        /**
-         * Signals a waiter on this object that it can now wake
-         * up and continue.  Must have this object locked before
-         * calling.
-         * @throws ActiveMQException
+         * Getters for the response property. Infinite Wait.
+         * @return the response object for the request
          */
-        virtual void notify() throw( exceptions::ActiveMQException ){
-            mutex.notify();
+        virtual const Response* getResponse() const{
+            this->responseLatch.await();
+            return response;
         }
-
-        /**
-         * Signals the waiters on this object that it can now wake
-         * up and continue.  Must have this object locked before
-         * calling.
-         * @throws ActiveMQException
-         */
-        virtual void notifyAll() throw( exceptions::ActiveMQException ){
-            mutex.notifyAll();
+        virtual Response* getResponse(){
+            this->responseLatch.await();
+            return response;
         }
 
         /**
-         * Getters for the response property.
+         * Getters for the response property. Timed Wait.
+         * @param timeout - time to wait in milliseconds
          * @return the response object for the request
          */
-        virtual const Response* getResponse() const{
+        virtual const Response* getResponse( unsigned timeout ) const{
+            this->responseLatch.await( timeout );
             return response;
         }
-        virtual Response* getResponse(){
+        virtual Response* getResponse( unsigned int timeout ){
+            this->responseLatch.await( timeout );
             return response;
         }
 
@@ -125,6 +81,7 @@
          */
         virtual void setResponse( Response* response ){
             this->response = response;
+            this->responseLatch.countDown();
         }
 
     };

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp?rev=686544&r1=686543&r2=686544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp Sat Aug 16 12:21:45 2008
@@ -109,8 +109,7 @@
 
         // Add a future response object to the map indexed by this
         // command id.
-        FutureResponse* futureResponse =
-           new FutureResponse();
+        FutureResponse* futureResponse = new FutureResponse();
 
         synchronized( &mapMutex ){
             requestMap[command->getCommandId()] = futureResponse;
@@ -119,17 +118,12 @@
         // Wait to be notified of the response via the futureResponse
         // object.
         Response* response = NULL;
-        synchronized( futureResponse ){
 
-            // Send the request.
-            next->oneway( command );
+        // Send the request.
+        next->oneway( command );
 
-            // Wait for the response to come in.
-            futureResponse->wait( maxResponseWaitTime );
-
-            // Get the response.
-            response = futureResponse->getResponse();
-        }
+        // Get the response.
+        response = futureResponse->getResponse( maxResponseWaitTime );
 
         // Perform cleanup on the map.
         synchronized( &mapMutex ){
@@ -166,8 +160,7 @@
 void ResponseCorrelator::onCommand( Command* command ) {
 
     // Let's see if the incoming command is a response.
-    Response* response =
-       dynamic_cast<Response*>( command );
+    Response* response = dynamic_cast<Response*>( command );
 
     if( response == NULL ){
 
@@ -194,14 +187,8 @@
         FutureResponse* futureResponse = NULL;
         futureResponse = iter->second;
 
-        synchronized( futureResponse ){
-
-            // Set the response property in the future response.
-            futureResponse->setResponse( response );
-
-            // Notify all waiting for this response.
-            futureResponse->notifyAll();
-        }
+        // Set the response property in the future response.
+        futureResponse->setResponse( response );
     }
 }