You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/19 23:18:54 UTC
svn commit: r776463 - in /qpid/trunk/qpid/cpp/src/qpid: cluster/Cluster.cpp
cluster/Multicaster.cpp cluster/Multicaster.h cluster/OutputInterceptor.cpp
sys/LatencyTracker.h
Author: aconway
Date: Tue May 19 21:18:52 2009
New Revision: 776463
URL: http://svn.apache.org/viewvc?rev=776463&view=rev
Log:
Instrumentation for measuring latencies.
Compiled out of normal builds, enable with -DQPID_LATENCY_TRACKER.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue May 19 21:18:52 2009
@@ -109,6 +109,7 @@
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterUpdateRequestBody.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
#include "qpid/management/IdAllocator.h"
@@ -294,23 +295,27 @@
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
+ LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
deliverEvent(e);
}
+LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
+
void Cluster::deliverEvent(const Event& e) {
- LATENCY_START(Event, "enqueue event", e.getData());
+ LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
- LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody());
+ LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody()));
deliverFrameQueue.push(e);
}
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- LATENCY_STAGE(Event, "dequeue event", e.getData());
+ LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isCluster()) {
EventFrame ef(e, e.getFrame());
@@ -329,7 +334,6 @@
}
else // Discard connection events if discarding is set.
QPID_LOG(trace, *this << " DROP: " << e);
- LATENCY_END(Event, "processed event", e.getData());
}
void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
@@ -337,18 +341,22 @@
error.error(connection, type, map.getFrameSeq(), map.getMembers());
}
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
- LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody());
+ LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
+ LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
// Process each frame through the error checker.
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
- LATENCY_END(EventFrame, "processed frame", e.frame.getBody());
}
+LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
+
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
@@ -357,6 +365,7 @@
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
+ LATENCY_TRACK(LatencyScope ls(processLatency));
map.incrementFrameSeq();
QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue May 19 21:18:52 2009
@@ -31,6 +31,9 @@
Multicaster::Multicaster(Cpg& cpg_,
const boost::shared_ptr<sys::Poller>& poller,
boost::function<void()> onError_) :
+#if defined (QPID_LATENCY_TRACKER)
+ cpgLatency("CPG"),
+#endif
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
holding(true)
@@ -58,6 +61,7 @@
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
+ LATENCY_TRACK(cpgLatency.start());
if (e.getType() == DATA && e.isConnection() && holding) {
holdingQueue.push_back(e);
return;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Tue May 19 21:18:52 2009
@@ -26,6 +26,7 @@
#include "Event.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
@@ -56,6 +57,8 @@
/** End holding mode, held events are mcast */
void release();
+ LATENCY_TRACK(sys::LatencyCounter cpgLatency;)
+
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue May 19 21:18:52 2009
@@ -24,6 +24,7 @@
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/LatencyTracker.h"
#include <boost/current_function.hpp>
@@ -41,7 +42,10 @@
moreOutput(), doingOutput()
{}
+LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;)
+
void OutputInterceptor::send(framing::AMQFrame& f) {
+ LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
parent.getCluster().checkQuorum();
{
// FIXME aconway 2009-04-28: locking around next-> may be redundant
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h Tue May 19 21:18:52 2009
@@ -23,82 +23,135 @@
*/
#include "Time.h"
+#include <string>
+#include <limits>
+#include <map>
namespace qpid {
namespace sys {
-/**
- * Record latency between events in the lifecycle of an object.
- * For testing/debugging purposes: use the macros to declare
- * and #define QPID_LATENCY_TRACKER to enable in a build.
+/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS.
+ * Uses should be compiled only if QPID_LATENCY_TRACKER is defined.
+ * See the convenience macros at the end of this file.
*/
-template <class T> class LatencyTracker
-{
- public:
- static void start(const char* name, const void* p) { instance.doStart(name, p); }
- static void stage(const char* name, const void* p) { instance.doStage(name, p); }
- static void end(const char* name, const void* p) { instance.doEnd(name, p); }
- private:
-
- LatencyTracker() : object(), times(), totals(), count(), names(), index(), maxIndex() { }
- ~LatencyTracker() { print(); }
+/** Used by LatencyCounter and LatencyTracker below */
+class LatencyStatistic {
+ public:
+ LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {}
+ ~LatencyStatistic() { print(); }
- void doStart(const char* n, const void* p) { if (!object) { name(n); object=p; times[0] = now(); index = 1; } }
- void doStage(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); } }
- void doEnd(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); record(); object = 0; } }
-
- void name(const char* n) {
- if (names[index] == 0) names[index] = n;
- assert(names[index] == n);
- }
-
- void record() {
- if (maxIndex == 0) maxIndex = index;
- assert(maxIndex == index);
- for (int i = 0; i < index-1; ++i)
- totals[i] += Duration(times[i], times[i+1]);
+ void record(Duration d) {
+ total += d;
++count;
+ if (d > max) max=d;
+ if (d < min) min=d;
}
void print() {
- printf("\nLatency from %s (%lu samples, %d stages) :\n", names[0], count, maxIndex-1);
- for (int i = 0; i < maxIndex-1; ++i)
- printf("to %s:\t%luus\n", names[i+1], (totals[i]/count)/TIME_USEC);
+ if (count) {
+ double meanMsec = (double(total)/count)/TIME_MSEC;
+ printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC);
+ }
+ else
+ printf("\n==== Latency metric %s: no samples.\n", name.c_str());
}
- static const int SIZE = 1024;
- const void* object;
- AbsTime times[SIZE];
- unsigned long totals[SIZE];
+ private:
+ std::string name;
unsigned long count;
- const char* names[SIZE];
- int index, maxIndex;
+ int64_t total, min, max;
+};
+
+/** Measure delay between seeing the same value at start and finish. */
+template <class T> class LatencyTracker {
+ public:
+ LatencyTracker(std::string name) : measuring(false), stat(name) {}
- static LatencyTracker instance;
+ void start(T value) {
+ sys::Mutex::ScopedLock l(lock);
+ if (!measuring) {
+ measureAt = value;
+ measuring = true;
+ startTime = AbsTime::now();
+ }
+ }
+
+ void finish(T value) {
+ sys::Mutex::ScopedLock l(lock);
+ if(measuring && measureAt == value) {
+ stat.record(Duration(startTime, AbsTime::now()));
+ measuring = false;
+ }
+ }
+
+ private:
+ sys::Mutex lock;
+ bool measuring;
+ T measureAt;
+ AbsTime startTime;
+ LatencyStatistic stat;
};
-template <class T> struct LatencyEndOnExit {
- const char* name;
- const void* ptr;
- LatencyEndOnExit(const char* n, const void* p) : name(n), ptr(p) {}
- ~LatencyEndOnExit() { LatencyTracker<T>::end(name, ptr); }
+
+/** Measures delay between the nth call to start and the nth call to finish.
+ * E.g. to measure latency between sending & receiving an ordered stream of messages.
+ */
+class LatencyCounter {
+ public:
+ LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {}
+
+ void start() {
+ sys::Mutex::ScopedLock l(lock);
+ if (!measuring) {
+ measureAt = startCount;
+ measuring = true;
+ startTime = AbsTime::now();
+ }
+ ++startCount;
+ }
+
+ void finish() {
+ sys::Mutex::ScopedLock l(lock);
+ if (measuring && measureAt == finishCount) {
+ stat.record(Duration(startTime, AbsTime::now()));
+ measuring = false;
+ }
+ ++finishCount;
+ }
+
+ private:
+ sys::Mutex lock;
+ bool measuring;
+ uint64_t startCount, finishCount, measureAt;
+ AbsTime startTime;
+ LatencyStatistic stat;
};
-template <class T> LatencyTracker<T> LatencyTracker<T>::instance;
+/** Measures time spent in a scope. */
+class LatencyScope {
+ public:
+ LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {}
+
+ ~LatencyScope() {
+ sys::Mutex::ScopedLock l(lock);
+ stat.record(Duration(startTime, AbsTime::now()));
+ }
+
+ private:
+ sys::Mutex lock;
+ LatencyStatistic& stat;
+ AbsTime startTime;
+};
+
+
+/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */
#if defined(QPID_LATENCY_TRACKER)
-#define LATENCY_START(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::start(NAME, PTR)
-#define LATENCY_STAGE(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::stage(NAME, PTR)
-#define LATENCY_END(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::end(NAME, PTR)
-#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) ::qpid::sys::LatencyEndOnExit<TAG>(NAME, PTR)
+#define LATENCY_TRACK(X) X
#else
-#define LATENCY_START(TAG, NAME, PTR) void(0)
-#define LATENCY_STAGE(TAG, NAME, PTR) void(0)
-#define LATENCY_END(TAG, NAME, PTR) void(0)
-#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) void(0)
+#define LATENCY_TRACK(X)
#endif
-
}} // namespace qpid::sys
#endif /*!QPID_SYS_LATENCYTRACKER_H*/
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org