You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/08/16 21:21:46 UTC
svn commit: r686544 - in
/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters:
FutureResponse.h ResponseCorrelator.cpp
Author: tabish
Date: Sat Aug 16 12:21:45 2008
New Revision: 686544
URL: http://svn.apache.org/viewvc?rev=686544&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-189
Adding timed and un-timed requests to the Response correlator code via the Future response object, refactored FutureReponse to use a Latch to wait for the response.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h?rev=686544&r1=686543&r2=686544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/FutureResponse.h Sat Aug 16 12:21:45 2008
@@ -20,7 +20,7 @@
#include <activemq/util/Config.h>
#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/transport/Response.h>
#include <activemq/exceptions/ActiveMQException.h>
@@ -30,92 +30,48 @@
namespace filters{
/**
- * A container that holds a response object. Since this
- * object is Synchronizable, callers can wait on this object
- * and when a response comes in, notify can be called to
- * inform those waiting that the response is now available.
+ * A container that holds a response object. Callers of the getResponse
+ * method will block until a response has been receive unless they call
+ * the getRepsonse that takes a timeout.
*/
- class AMQCPP_API FutureResponse : public decaf::util::concurrent::Synchronizable{
+ class AMQCPP_API FutureResponse {
private:
+ mutable decaf::util::concurrent::CountDownLatch responseLatch;
Response* response;
- decaf::util::concurrent::Mutex mutex;
public:
- FutureResponse(){
+ FutureResponse() : responseLatch( 1 ) {
response = NULL;
}
virtual ~FutureResponse(){}
/**
- * Locks the object.
- * @throws ActiveMQException
- */
- virtual void lock() throw( exceptions::ActiveMQException ){
- mutex.lock();
- }
-
- /**
- * Unlocks the object.
- * @throws ActiveMQException
- */
- virtual void unlock() throw( exceptions::ActiveMQException ){
- mutex.unlock();
- }
-
- /**
- * Waits on a signal from this object, which is generated
- * by a call to Notify. Must have this object locked before
- * calling.
- * @throws ActiveMQException
- */
- virtual void wait() throw( exceptions::ActiveMQException ){
- mutex.wait();
- }
-
- /**
- * Waits on a signal from this object, which is generated
- * by a call to Notify. Must have this object locked before
- * calling. This wait will timeout after the specified time
- * interval.
- * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
- * @throws ActiveMQException
- */
- virtual void wait( unsigned long millisecs )
- throw( exceptions::ActiveMQException ) {
- mutex.wait( millisecs );
- }
-
- /**
- * Signals a waiter on this object that it can now wake
- * up and continue. Must have this object locked before
- * calling.
- * @throws ActiveMQException
+ * Getters for the response property. Infinite Wait.
+ * @return the response object for the request
*/
- virtual void notify() throw( exceptions::ActiveMQException ){
- mutex.notify();
+ virtual const Response* getResponse() const{
+ this->responseLatch.await();
+ return response;
}
-
- /**
- * Signals the waiters on this object that it can now wake
- * up and continue. Must have this object locked before
- * calling.
- * @throws ActiveMQException
- */
- virtual void notifyAll() throw( exceptions::ActiveMQException ){
- mutex.notifyAll();
+ virtual Response* getResponse(){
+ this->responseLatch.await();
+ return response;
}
/**
- * Getters for the response property.
+ * Getters for the response property. Timed Wait.
+ * @param timeout - time to wait in milliseconds
* @return the response object for the request
*/
- virtual const Response* getResponse() const{
+ virtual const Response* getResponse( unsigned timeout ) const{
+ this->responseLatch.await( timeout );
return response;
}
- virtual Response* getResponse(){
+ virtual Response* getResponse( unsigned int timeout ){
+ this->responseLatch.await( timeout );
return response;
}
@@ -125,6 +81,7 @@
*/
virtual void setResponse( Response* response ){
this->response = response;
+ this->responseLatch.countDown();
}
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp?rev=686544&r1=686543&r2=686544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp Sat Aug 16 12:21:45 2008
@@ -109,8 +109,7 @@
// Add a future response object to the map indexed by this
// command id.
- FutureResponse* futureResponse =
- new FutureResponse();
+ FutureResponse* futureResponse = new FutureResponse();
synchronized( &mapMutex ){
requestMap[command->getCommandId()] = futureResponse;
@@ -119,17 +118,12 @@
// Wait to be notified of the response via the futureResponse
// object.
Response* response = NULL;
- synchronized( futureResponse ){
- // Send the request.
- next->oneway( command );
+ // Send the request.
+ next->oneway( command );
- // Wait for the response to come in.
- futureResponse->wait( maxResponseWaitTime );
-
- // Get the response.
- response = futureResponse->getResponse();
- }
+ // Get the response.
+ response = futureResponse->getResponse( maxResponseWaitTime );
// Perform cleanup on the map.
synchronized( &mapMutex ){
@@ -166,8 +160,7 @@
void ResponseCorrelator::onCommand( Command* command ) {
// Let's see if the incoming command is a response.
- Response* response =
- dynamic_cast<Response*>( command );
+ Response* response = dynamic_cast<Response*>( command );
if( response == NULL ){
@@ -194,14 +187,8 @@
FutureResponse* futureResponse = NULL;
futureResponse = iter->second;
- synchronized( futureResponse ){
-
- // Set the response property in the future response.
- futureResponse->setResponse( response );
-
- // Notify all waiting for this response.
- futureResponse->notifyAll();
- }
+ // Set the response property in the future response.
+ futureResponse->setResponse( response );
}
}