You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/01/13 20:16:08 UTC
svn commit: r734221 - in /qpid/trunk/qpid/cpp/src/qpid/client:
ConnectionHandler.cpp ConnectionHandler.h ConnectionImpl.cpp
ConnectionImpl.h ConnectionSettings.h Connector.cpp Connector.h
RdmaConnector.cpp SslConnector.cpp
Author: astitcher
Date: Tue Jan 13 11:15:31 2009
New Revision: 734221
URL: http://svn.apache.org/viewvc?rev=734221&view=rev
Log:
Implement heartbeat timeout on client:
- The client shuts down a connection if
it receives no traffic on it in 2 timeout periods
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Jan 13 11:15:31 2009
@@ -33,6 +33,13 @@
using namespace qpid::framing;
using namespace qpid::framing::connection;
using qpid::sys::SecurityLayer;
+using qpid::sys::Duration;
+using qpid::sys::TimerTask;
+using qpid::sys::Timer;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_SEC;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
namespace {
const std::string OK("OK");
@@ -60,7 +67,7 @@
ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
: StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
errorCode(CLOSE_CODE_NORMAL), version(v)
-{
+{
insist = true;
ESTABLISHED.insert(FAILED);
@@ -69,14 +76,18 @@
FINISHED.insert(FAILED);
FINISHED.insert(CLOSED);
-}
+}
void ConnectionHandler::incoming(AMQFrame& frame)
{
if (getState() == CLOSED) {
- throw Exception("Received frame on closed connection");
+ throw Exception("Received frame on closed connection");
}
+ if (rcvTimeoutTask) {
+ // Received frame on connection so delay timeout
+ rcvTimeoutTask->restart();
+ }
AMQBody* body = frame.getBody();
try {
@@ -86,18 +97,18 @@
in(frame);
break;
case CLOSING:
- QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
+ QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
break;
default:
throw Exception("Cannot receive frames on non-zero channel until connection is established.");
}
}
}catch(std::exception& e){
- QPID_LOG(warning, "Closing connection due to " << e.what());
+ QPID_LOG(warning, "Closing connection due to " << e.what());
setState(CLOSING);
errorCode = CLOSE_CODE_FRAMING_ERROR;
errorText = e.what();
- proxy.close(501, e.what());
+ proxy.close(501, e.what());
}
}
@@ -135,9 +146,9 @@
void ConnectionHandler::heartbeat()
{
- // Do nothing - the purpose of heartbeats is just to make sure that there is some
- // traffic on the connection within the heart beat interval, we check for the
- // traffic and don't need to do anything in response to heartbeats
+ // Do nothing - the purpose of heartbeats is just to make sure that there is some
+ // traffic on the connection within the heart beat interval, we check for the
+ // traffic and don't need to do anything in response to heartbeats
}
void ConnectionHandler::checkState(STATES s, const std::string& msg)
@@ -175,7 +186,7 @@
if (i != mechanisms.begin()) mechlist += SPACE;
mechlist += (*i)->get<std::string>();
}
- }
+ }
if (!chosenMechanismSupported) {
fail("Selected mechanism not supported: " + mechanism);
@@ -210,11 +221,10 @@
// Clip the requested heartbeat to the maximum/minimum offered
uint16_t heartbeat = ConnectionSettings::heartbeat;
heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
- heartbeat > heartbeatMax ? heartbeatMax :
- heartbeat;
+ heartbeat > heartbeatMax ? heartbeatMax :
+ heartbeat;
+ ConnectionSettings::heartbeat = heartbeat;
proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
- // TODO set connection timeout to be 2x heart beat interval
- // TODO and set an alarm for it.
setState(OPENING);
proxy.open(virtualhost, capabilities, insist);
}
@@ -279,3 +289,8 @@
{
return securityLayer;
}
+
+void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t)
+{
+ rcvTimeoutTask = t;
+}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Tue Jan 13 11:15:31 2009
@@ -35,6 +35,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/InputHandler.h"
#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/Url.h"
#include <memory>
@@ -69,6 +70,7 @@
framing::FieldTable properties;
std::auto_ptr<Sasl> sasl;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+ boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask;
void checkState(STATES s, const std::string& msg);
@@ -109,7 +111,8 @@
bool isClosed() const;
bool isClosing() const;
- std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();
+ std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();
+ void setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask>);
CloseListener onClose;
ErrorListener onError;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Jan 13 11:15:31 2009
@@ -41,6 +41,31 @@
using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
+// Get timer singleton
+Timer& theTimer() {
+ static Mutex timerInitLock;
+ ScopedLock<Mutex> l(timerInitLock);
+
+ static qpid::sys::Timer t;
+ return t;
+}
+
+class HeartbeatTask : public TimerTask {
+ TimeoutHandler& timeout;
+
+ void fire() {
+ // If we ever get here then we have timed out
+ QPID_LOG(debug, "Traffic timeout");
+ timeout.idleIn();
+ }
+
+public:
+ HeartbeatTask(Duration p, TimeoutHandler& t) :
+ TimerTask(p),
+ timeout(t)
+ {}
+};
+
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
@@ -110,6 +135,16 @@
connector->connect(host, port);
connector->init();
handler.waitForOpen();
+
+ // Enable heartbeat if requested
+ uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
+ if (heartbeat) {
+ // Set connection timeout to be 2x heart beat interval and setup timer
+ heartbeatTask = new HeartbeatTask(heartbeat * 2 * TIME_SEC, *this);
+ handler.setRcvTimeoutTask(heartbeatTask);
+ theTimer().add(heartbeatTask);
+ }
+
//enable security layer if one has been negotiated:
std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
if (securityLayer.get()) {
@@ -124,7 +159,7 @@
void ConnectionImpl::idleIn()
{
- close();
+ connector->abort();
}
void ConnectionImpl::idleOut()
@@ -136,6 +171,9 @@
void ConnectionImpl::close()
{
if (!handler.isOpen()) return;
+ if (heartbeatTask) {
+ heartbeatTask->cancel();
+ }
handler.close();
closed(CLOSE_CODE_NORMAL, "Closed by client");
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Tue Jan 13 11:15:31 2009
@@ -61,6 +61,8 @@
uint16_t nextChannel;
sys::Mutex lock;
+ boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask;
+
template <class F> void closeInternal(const F&);
void incoming(framing::AMQFrame& frame);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h Tue Jan 13 11:15:31 2009
@@ -89,8 +89,7 @@
*/
std::string locale;
/**
- * Allows a heartbeat frequency to be specified (this feature is
- * not yet implemented).
+ * Allows a heartbeat frequency to be specified
*/
uint16_t heartbeat;
/**
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Jan 13 11:15:31 2009
@@ -128,6 +128,7 @@
void init();
void close();
void send(framing::AMQFrame& frame);
+ void abort();
void setInputHandler(framing::InputHandler* handler);
void setShutdownHandler(sys::ShutdownHandler* handler);
@@ -233,6 +234,10 @@
closeInternal();
}
+void TCPConnector::abort() {
+ aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+}
+
void TCPConnector::setInputHandler(InputHandler* handler){
input = handler;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Tue Jan 13 11:15:31 2009
@@ -64,6 +64,7 @@
virtual void init() {};
virtual void close() = 0;
virtual void send(framing::AMQFrame& frame) = 0;
+ virtual void abort() = 0;
virtual void setInputHandler(framing::InputHandler* handler) = 0;
virtual void setShutdownHandler(sys::ShutdownHandler* handler) = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue Jan 13 11:15:31 2009
@@ -105,6 +105,7 @@
void connect(const std::string& host, int port);
void close();
void send(framing::AMQFrame& frame);
+ void abort() {} // TODO: need to fix this for heartbeat timeouts to work
void setInputHandler(framing::InputHandler* handler);
void setShutdownHandler(sys::ShutdownHandler* handler);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=734221&r1=734220&r2=734221&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Jan 13 11:15:31 2009
@@ -122,6 +122,7 @@
void init();
void close();
void send(framing::AMQFrame& frame);
+ void abort() {} // TODO: Need to fix for heartbeat timeouts to work
void setInputHandler(framing::InputHandler* handler);
void setShutdownHandler(sys::ShutdownHandler* handler);
@@ -372,8 +373,6 @@
handleClosed();
}
-// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
-// will never be called
void SslConnector::run(){
// Keep the connection impl in memory until run() completes.
boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();