You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/01/13 20:16:08 UTC

svn commit: r734221 - in /qpid/trunk/qpid/cpp/src/qpid/client: ConnectionHandler.cpp ConnectionHandler.h ConnectionImpl.cpp ConnectionImpl.h ConnectionSettings.h Connector.cpp Connector.h RdmaConnector.cpp SslConnector.cpp

Author: astitcher
Date: Tue Jan 13 11:15:31 2009
New Revision: 734221

URL: http://svn.apache.org/viewvc?rev=734221&view=rev
Log:
Implement heartbeat timeout on client:
- The client shuts down a connection if
  it receives no traffic on it in 2 timeout periods

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Jan 13 11:15:31 2009
@@ -33,6 +33,13 @@
 using namespace qpid::framing;
 using namespace qpid::framing::connection;
 using qpid::sys::SecurityLayer;
+using qpid::sys::Duration;
+using qpid::sys::TimerTask;
+using qpid::sys::Timer;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_SEC;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
 
 namespace {
 const std::string OK("OK");
@@ -60,7 +67,7 @@
 ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) 
     : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), 
       errorCode(CLOSE_CODE_NORMAL), version(v)
-{    
+{
     insist = true;
 
     ESTABLISHED.insert(FAILED);
@@ -69,14 +76,18 @@
 
     FINISHED.insert(FAILED);
     FINISHED.insert(CLOSED);
-} 
+}
 
 void ConnectionHandler::incoming(AMQFrame& frame)
 {
     if (getState() == CLOSED) {
-        throw Exception("Received frame on closed connection");        
+        throw Exception("Received frame on closed connection");
     }
 
+    if (rcvTimeoutTask) {
+        // Received frame on connection so delay timeout
+        rcvTimeoutTask->restart();
+    }
 
     AMQBody* body = frame.getBody();
     try {
@@ -86,18 +97,18 @@
                 in(frame);
                 break;
             case CLOSING:
-                QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);        
+                QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
                 break;
             default:
                 throw Exception("Cannot receive frames on non-zero channel until connection is established.");
             }
         }
     }catch(std::exception& e){
-        QPID_LOG(warning, "Closing connection due to " << e.what());        
+        QPID_LOG(warning, "Closing connection due to " << e.what());
         setState(CLOSING);
         errorCode = CLOSE_CODE_FRAMING_ERROR;
         errorText = e.what();
-        proxy.close(501, e.what());    
+        proxy.close(501, e.what());
     }
 }
 
@@ -135,9 +146,9 @@
 
 void ConnectionHandler::heartbeat()
 {
-	// Do nothing - the purpose of heartbeats is just to make sure that there is some
-	// traffic on the connection within the heart beat interval, we check for the
-	// traffic and don't need to do anything in response to heartbeats
+    // Do nothing - the purpose of heartbeats is just to make sure that there is some
+    // traffic on the connection within the heart beat interval, we check for the
+    // traffic and don't need to do anything in response to heartbeats
 }
 
 void ConnectionHandler::checkState(STATES s, const std::string& msg)
@@ -175,7 +186,7 @@
             if (i != mechanisms.begin()) mechlist += SPACE;
             mechlist += (*i)->get<std::string>();
         }
-    }        
+    }
 
     if (!chosenMechanismSupported) {
         fail("Selected mechanism not supported: " + mechanism);
@@ -210,11 +221,10 @@
     // Clip the requested heartbeat to the maximum/minimum offered 
     uint16_t heartbeat = ConnectionSettings::heartbeat;
     heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
-    			heartbeat > heartbeatMax ? heartbeatMax :
-    			heartbeat;    					 
+                heartbeat > heartbeatMax ? heartbeatMax :
+                heartbeat;
+    ConnectionSettings::heartbeat = heartbeat;
     proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
-    // TODO set connection timeout to be 2x heart beat interval
-    // TODO and set an alarm for it.
     setState(OPENING);
     proxy.open(virtualhost, capabilities, insist);
 }
@@ -279,3 +289,8 @@
 {
     return securityLayer;
 }
+
+void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t)
+{
+    rcvTimeoutTask = t;
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Tue Jan 13 11:15:31 2009
@@ -35,6 +35,7 @@
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/InputHandler.h"
 #include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/Url.h"
 #include <memory>
 
@@ -69,6 +70,7 @@
     framing::FieldTable properties;
     std::auto_ptr<Sasl> sasl;
     std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+    boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask;
 
     void checkState(STATES s, const std::string& msg);
 
@@ -109,7 +111,8 @@
     bool isClosed() const;
     bool isClosing() const;
 
-    std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();    
+    std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();
+    void setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask>);
 
     CloseListener onClose;
     ErrorListener onError;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Jan 13 11:15:31 2009
@@ -41,6 +41,31 @@
 using namespace qpid::sys;
 using namespace qpid::framing::connection;//for connection error codes
 
+// Get timer singleton  
+Timer& theTimer() {
+    static Mutex timerInitLock;
+    ScopedLock<Mutex> l(timerInitLock);
+
+    static qpid::sys::Timer t;
+    return t;
+}
+
+class HeartbeatTask : public TimerTask {
+    TimeoutHandler& timeout;
+
+    void fire() {
+        // If we ever get here then we have timed out
+        QPID_LOG(debug, "Traffic timeout");
+        timeout.idleIn();
+    }
+
+public:
+    HeartbeatTask(Duration p, TimeoutHandler& t) :
+        TimerTask(p),
+        timeout(t)
+    {}
+};
+
 ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
     : Bounds(settings.maxFrameSize * settings.bounds),
       handler(settings, v),
@@ -110,6 +135,16 @@
     connector->connect(host, port);
     connector->init();
     handler.waitForOpen();
+ 
+    // Enable heartbeat if requested
+    uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
+    if (heartbeat) {
+        // Set connection timeout to be 2x heart beat interval and setup timer
+        heartbeatTask = new HeartbeatTask(heartbeat * 2 * TIME_SEC, *this);
+        handler.setRcvTimeoutTask(heartbeatTask);
+        theTimer().add(heartbeatTask);
+    }
+ 
     //enable security layer if one has been negotiated:
     std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
     if (securityLayer.get()) {
@@ -124,7 +159,7 @@
 
 void ConnectionImpl::idleIn()
 {
-    close();
+    connector->abort();
 }
 
 void ConnectionImpl::idleOut()
@@ -136,6 +171,9 @@
 void ConnectionImpl::close()
 {
     if (!handler.isOpen()) return;
+    if (heartbeatTask) {
+        heartbeatTask->cancel();
+    }
     handler.close();
     closed(CLOSE_CODE_NORMAL, "Closed by client");
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Tue Jan 13 11:15:31 2009
@@ -61,6 +61,8 @@
     uint16_t nextChannel;
     sys::Mutex lock;
 
+    boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask;
+
     template <class F> void closeInternal(const F&);
 
     void incoming(framing::AMQFrame& frame);    

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h Tue Jan 13 11:15:31 2009
@@ -89,8 +89,7 @@
      */
     std::string locale;
     /**
-     * Allows a heartbeat frequency to be specified (this feature is
-     * not yet implemented).
+     * Allows a heartbeat frequency to be specified
      */
     uint16_t heartbeat;
     /**

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Jan 13 11:15:31 2009
@@ -128,6 +128,7 @@
     void init();
     void close();
     void send(framing::AMQFrame& frame);
+    void abort();
 
     void setInputHandler(framing::InputHandler* handler);
     void setShutdownHandler(sys::ShutdownHandler* handler);
@@ -233,6 +234,10 @@
     closeInternal();
 }
 
+void TCPConnector::abort() {
+    aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+}
+
 void TCPConnector::setInputHandler(InputHandler* handler){
     input = handler;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Tue Jan 13 11:15:31 2009
@@ -64,6 +64,7 @@
     virtual void init() {};
     virtual void close() = 0;
     virtual void send(framing::AMQFrame& frame) = 0;
+    virtual void abort() = 0;
 
     virtual void setInputHandler(framing::InputHandler* handler) = 0;
     virtual void setShutdownHandler(sys::ShutdownHandler* handler) = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue Jan 13 11:15:31 2009
@@ -105,6 +105,7 @@
     void connect(const std::string& host, int port);
     void close();
     void send(framing::AMQFrame& frame);
+    void abort() {} // TODO: need to fix this for heartbeat timeouts to work
 
     void setInputHandler(framing::InputHandler* handler);
     void setShutdownHandler(sys::ShutdownHandler* handler);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Jan 13 11:15:31 2009
@@ -122,6 +122,7 @@
     void init();
     void close();
     void send(framing::AMQFrame& frame);
+    void abort() {} // TODO: Need to fix for heartbeat timeouts to work
 
     void setInputHandler(framing::InputHandler* handler);
     void setShutdownHandler(sys::ShutdownHandler* handler);
@@ -372,8 +373,6 @@
     handleClosed();
 }
 
-// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
-// will never be called
 void SslConnector::run(){
     // Keep the connection impl in memory until run() completes.
     boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();