You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/04/19 04:00:17 UTC
svn commit: r1469661 - in /qpid/trunk/qpid/cpp: include/qpid/sys/
src/qpid/broker/ src/qpid/ha/ src/qpid/sys/posix/ src/tests/
Author: aconway
Date: Fri Apr 19 02:00:16 2013
New Revision: 1469661
URL: http://svn.apache.org/r1469661
Log:
QPID-4748: Consistent handling of durations in broker configuration, allowing sub-second intervals.
Provides string conversion for sys::Duration, allowing intervals to be expressed like this:
10.5 - value in seconds, backward compatible.
10.5s - value in seconds
10.5ms - value in milliseconds
10.5us - value in microseconds
10.5ns - value in nanoseconds
Converted the folllowing broker options to Duration:
mgmtPubInterval, queueCleanInterval, linkMaintenanceInterval, linkHeartbeatInterval
Did not convert: maxNegotiateTime. This is expressed in milliseconds so it would not be
backward compatible to make it a Duration.
Modified:
qpid/trunk/qpid/cpp/include/qpid/sys/Time.h
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp
qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/sys/Time.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/sys/Time.h?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/sys/Time.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/sys/Time.h Fri Apr 19 02:00:16 2013
@@ -125,6 +125,7 @@ public:
};
std::ostream& operator << (std::ostream&, const Duration&);
+std::istream& operator >> (std::istream&, Duration&);
inline AbsTime now() { return AbsTime::now(); }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Apr 19 02:00:16 2013
@@ -126,8 +126,8 @@ Broker::Options::Options(const std::stri
connectionBacklog(10),
enableMgmt(1),
mgmtPublish(1),
- mgmtPubInterval(10),
- queueCleanInterval(60*10),//10 minutes
+ mgmtPubInterval(10*sys::TIME_SEC),
+ queueCleanInterval(60*sys::TIME_SEC*10),//10 minutes
auth(SaslAuthenticator::available()),
realm("QPID"),
replayFlushLimit(0),
@@ -143,8 +143,8 @@ Broker::Options::Options(const std::stri
queueThresholdEventRatio(80),
defaultMsgGroup("qpid.no-group"),
timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
- linkMaintenanceInterval(2),
- linkHeartbeatInterval(120),
+ linkMaintenanceInterval(2*sys::TIME_SEC),
+ linkHeartbeatInterval(120*sys::TIME_SEC),
maxNegotiateTime(10000) // 10s
{
int c = sys::SystemInfo::concurrency();
@@ -168,8 +168,6 @@ Broker::Options::Options(const std::stri
("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
- // FIXME aconway 2012-02-13: consistent treatment of values in SECONDS
- // allow sub-second intervals.
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -232,7 +230,7 @@ Broker::Broker(const Broker::Options& co
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish,
- conf.mgmtPubInterval, this, conf.workerThreads + 3);
+ conf.mgmtPubInterval/sys::TIME_SEC, this, conf.workerThreads + 3);
managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
@@ -244,7 +242,7 @@ Broker::Broker(const Broker::Options& co
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
mgmtObject->set_connBacklog(conf.connectionBacklog);
- mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
+ mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval/sys::TIME_SEC);
mgmtObject->set_mgmtPublish(conf.mgmtPublish);
mgmtObject->set_version(qpid::version);
if (dataDir.isEnabled())
@@ -356,7 +354,7 @@ Broker::Broker(const Broker::Options& co
}
if (conf.queueCleanInterval) {
- queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
+ queueCleaner.start(conf.queueCleanInterval);
}
if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Apr 19 02:00:16 2013
@@ -98,8 +98,8 @@ class Broker : public sys::Runnable, pub
int connectionBacklog;
bool enableMgmt;
bool mgmtPublish;
- uint16_t mgmtPubInterval;
- uint16_t queueCleanInterval;
+ sys::Duration mgmtPubInterval;
+ sys::Duration queueCleanInterval;
bool auth;
std::string realm;
size_t replayFlushLimit;
@@ -116,8 +116,8 @@ class Broker : public sys::Runnable, pub
uint16_t queueThresholdEventRatio;
std::string defaultMsgGroup;
bool timestampRcvMsgs;
- double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values.
- uint16_t linkHeartbeatInterval;
+ sys::Duration linkMaintenanceInterval;
+ sys::Duration linkHeartbeatInterval;
uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation
std::string fedTag;
@@ -350,7 +350,6 @@ class Broker : public sys::Runnable, pub
QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
- QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; }
/** Information identifying this system */
boost::shared_ptr<const System> getSystem() const { return systemObject; }
friend class StatusCheckThread;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Apr 19 02:00:16 2013
@@ -34,6 +34,7 @@
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/Time.h"
#include "qpid/broker/AclModule.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qmf/org/apache/qpid/broker/EventClientConnectFail.h"
@@ -401,7 +402,9 @@ void ConnectionHandler::Handler::tune(ui
// this method is only ever called when this Connection
// is a federation link where this Broker is acting as
// a client to another Broker
- uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax);
+ sys::Duration interval = connection.getBroker().getOptions().linkHeartbeatInterval;
+ uint16_t intervalSec = static_cast<uint16_t>(interval/sys::TIME_SEC);
+ uint16_t hb = std::min(intervalSec, heartbeatMax);
connection.setHeartbeat(hb);
connection.startLinkHeartbeatTimeoutTask();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Fri Apr 19 02:00:16 2013
@@ -61,8 +61,7 @@ namespace {
struct LinkTimerTask : public sys::TimerTask {
LinkTimerTask(Link& l, sys::Timer& t)
- : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
- sys::TIME_SEC),
+ : TimerTask(l.getBroker()->getOptions().linkMaintenanceInterval,
"Link retry timer"),
link(l), timer(t) {}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Fri Apr 19 02:00:16 2013
@@ -54,7 +54,7 @@ Backup::Backup(HaBroker& hb, const Setti
haBroker(hb), broker(hb.getBroker()), settings(s),
statusCheck(
new StatusCheck(
- logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo()))
+ logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo()))
{
// Set link properties to tag outgoing links.
framing::FieldTable linkProperties = broker.getLinkClientProperties();
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Apr 19 02:00:16 2013
@@ -104,8 +104,7 @@ Primary::Primary(HaBroker& hb, const Bro
backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
}
// Set timeout for expected brokers to connect and become ready.
- sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
- sys::AbsTime deadline(sys::now(), timeout);
+ sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout);
timerTask = new ExpectedBackupTimerTask(*this, deadline);
hb.getBroker().getTimer().add(timerTask);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h Fri Apr 19 02:00:16 2013
@@ -23,6 +23,7 @@
*/
#include "types.h"
+#include "qpid/sys/Time.h"
#include "qpid/sys/IntegerTypes.h"
#include <string>
@@ -36,7 +37,7 @@ class Settings
{
public:
Settings() : cluster(false), queueReplication(false),
- replicateDefault(NONE), backupTimeout(5),
+ replicateDefault(NONE), backupTimeout(5*sys::TIME_SEC),
flowMessages(100), flowBytes(0)
{}
@@ -46,7 +47,7 @@ class Settings
std::string brokerUrl;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
- double backupTimeout;
+ sys::Duration backupTimeout;
uint32_t flowMessages, flowBytes;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Fri Apr 19 02:00:16 2013
@@ -46,21 +46,21 @@ class StatusCheckThread : public sys::Ru
private:
Url url;
StatusCheck& statusCheck;
- uint16_t linkHeartbeatInterval;
+ sys::Duration linkHeartbeatInterval;
BrokerInfo brokerInfo;
};
void StatusCheckThread::run() {
QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
- Variant::Map options, clientProperties;
- clientProperties = brokerInfo.asMap(); // Detect self connections.
- clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups.
-
- options["client-properties"] = clientProperties;
- options["heartbeat"] = statusCheck.linkHeartbeatInterval;
- Connection c(url.str(), options);
-
try {
+ Variant::Map options, clientProperties;
+ clientProperties = brokerInfo.asMap(); // Detect self connections.
+ clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups.
+
+ options["client-properties"] = clientProperties;
+ options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
+ Connection c(url.str(), options);
+
c.open();
Session session = c.createSession();
messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}");
@@ -78,7 +78,7 @@ void StatusCheckThread::run() {
content["_object_id"] = oid;
encode(content, request);
s.send(request);
- Message response = r.fetch(statusCheck.linkHeartbeatInterval*Duration::SECOND);
+ Message response = r.fetch(messaging::Duration(linkHeartbeatInterval/TIME_MSEC));
session.acknowledge();
Variant::List contentIn;
decode(response, contentIn);
@@ -98,7 +98,7 @@ void StatusCheckThread::run() {
delete this;
}
-StatusCheck::StatusCheck(const string& lp, uint16_t lh, const BrokerInfo& self)
+StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self)
: logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
{}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Fri Apr 19 02:00:16 2013
@@ -27,6 +27,7 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Time.h"
#include <vector>
namespace qpid {
@@ -50,7 +51,7 @@ namespace ha {
class StatusCheck
{
public:
- StatusCheck(const std::string& logPrefix, uint16_t linkHeartbeatInteval, const BrokerInfo& self);
+ StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self);
~StatusCheck();
void setUrl(const Url&);
bool canPromote();
@@ -62,7 +63,7 @@ class StatusCheck
sys::Mutex lock;
std::vector<sys::Thread> threads;
bool promote;
- uint16_t linkHeartbeatInterval;
+ sys::Duration linkHeartbeatInterval;
BrokerInfo brokerInfo;
friend class StatusCheckThread;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/Time.cpp Fri Apr 19 02:00:16 2013
@@ -23,11 +23,13 @@
#include "qpid/sys/Time.h"
#include <ostream>
+#include <istream>
#include <time.h>
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
#include <iomanip>
+#include <cctype>
namespace {
int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); }
@@ -73,19 +75,36 @@ struct timespec& toTimespec(struct times
return ts;
}
-struct timeval& toTimeval(struct timeval& tv, const Duration& t) {
- Duration secs = t / TIME_SEC;
- tv.tv_sec = (secs > TIME_T_MAX) ? TIME_T_MAX : static_cast<time_t>(secs);
- tv.tv_usec = static_cast<suseconds_t>((t%TIME_SEC)/TIME_USEC);
- return tv;
-}
-
Duration toTime(const struct timespec& ts) {
return ts.tv_sec*TIME_SEC + ts.tv_nsec;
}
std::ostream& operator<<(std::ostream& o, const Duration& d) {
- return o << int64_t(d) << "ns";
+ if (d >= TIME_SEC) return o << (double(d)/TIME_SEC) << "s";
+ if (d >= TIME_MSEC) return o << (double(d)/TIME_MSEC) << "ms";
+ if (d >= TIME_USEC) return o << (double(d)/TIME_USEC) << "us";
+ return o << int64_t(d) << "ns";
+}
+
+std::istream& operator>>(std::istream& i, Duration& d) {
+ // Don't throw, let the istream throw if it's configured to do so.
+ double number;
+ i >> number;
+ if (i.fail()) return i;
+
+ if (i.eof() || std::isspace(i.peek())) // No suffix
+ d = number*TIME_SEC;
+ else {
+ std::string suffix;
+ i >> suffix;
+ if (i.fail()) return i;
+ if (suffix.compare("s") == 0) d = number*TIME_SEC;
+ else if (suffix.compare("ms") == 0) d = number*TIME_MSEC;
+ else if (suffix.compare("us") == 0) d = number*TIME_USEC;
+ else if (suffix.compare("ns") == 0) d = number*TIME_NSEC;
+ else i.setstate(std::ios::failbit);
+ }
+ return i;
}
namespace {
Modified: qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp?rev=1469661&r1=1469660&r2=1469661&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp Fri Apr 19 02:00:16 2013
@@ -21,6 +21,7 @@
*/
#include "qpid/sys/Timer.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/Options.h"
#include "unit_test.h"
#include <math.h>
#include <iostream>
@@ -127,6 +128,49 @@ QPID_AUTO_TEST_CASE(testGeneral)
dynamic_pointer_cast<TestTask>(task4)->check(2);
}
+std::string toString(Duration d) { return boost::lexical_cast<std::string>(d); }
+Duration fromString(const std::string& str) { return boost::lexical_cast<Duration>(str); }
+
+QPID_AUTO_TEST_CASE(testOstreamInOut) {
+ std::string empty;
+ BOOST_CHECK_EQUAL(toString(Duration(TIME_SEC)), "1s");
+ BOOST_CHECK_EQUAL(toString(Duration(TIME_SEC*123.4)), "123.4s");
+ BOOST_CHECK_EQUAL(toString(Duration(TIME_MSEC*123.4)), "123.4ms");
+ BOOST_CHECK_EQUAL(toString(Duration(TIME_USEC*123.4)), "123.4us");
+ BOOST_CHECK_EQUAL(toString(Duration(TIME_NSEC*123)), "123ns");
+
+ BOOST_CHECK_EQUAL(fromString("123.4"), Duration(TIME_SEC*123.4));
+ BOOST_CHECK_EQUAL(fromString("123.4s"), Duration(TIME_SEC*123.4));
+ BOOST_CHECK_EQUAL(fromString("123ms"), Duration(TIME_MSEC*123));
+ BOOST_CHECK_EQUAL(fromString("123us"), Duration(TIME_USEC*123));
+ BOOST_CHECK_EQUAL(fromString("123ns"), Duration(TIME_NSEC*123));
+
+ Duration d = 0;
+ std::istringstream i;
+ std::string s;
+
+ i.str("123x");
+ i >> d;
+ BOOST_CHECK(i.fail());
+ BOOST_CHECK_EQUAL(d, 0);
+ BOOST_CHECK_EQUAL(i.str(), "123x");
+
+ i.str("xxx");
+ i >> d;
+ BOOST_CHECK(i.fail());
+ BOOST_CHECK_EQUAL(d, 0);
+ BOOST_CHECK_EQUAL(i.str(), "xxx");
+}
+
+QPID_AUTO_TEST_CASE(testOptionParse) {
+ Options opts;
+ Duration interval;
+ opts.addOptions()("interval", optValue(interval, "I"), "blah");
+ const char *args[] = { "fakeexe", "--interval", "123.4" };
+ opts.parse(sizeof(args)/sizeof(args[0]), args);
+ BOOST_CHECK_EQUAL(interval, Duration(TIME_SEC * 123.4));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org