You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/19 23:18:54 UTC

svn commit: r776463 - in /qpid/trunk/qpid/cpp/src/qpid: cluster/Cluster.cpp cluster/Multicaster.cpp cluster/Multicaster.h cluster/OutputInterceptor.cpp sys/LatencyTracker.h

Author: aconway
Date: Tue May 19 21:18:52 2009
New Revision: 776463

URL: http://svn.apache.org/viewvc?rev=776463&view=rev
Log:
Instrumentation for measuring latencies.

Compiled out of normal builds, enable with -DQPID_LATENCY_TRACKER.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue May 19 21:18:52 2009
@@ -109,6 +109,7 @@
 #include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterUpdateOfferBody.h"
 #include "qpid/framing/ClusterUpdateRequestBody.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Helpers.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/IdAllocator.h"
@@ -294,23 +295,27 @@
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
+    LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
     deliverEvent(e);
 }
 
+LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
+
 void Cluster::deliverEvent(const Event& e) {
-    LATENCY_START(Event, "enqueue event", e.getData());
+    LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
     deliverEventQueue.push(e);
 }
 
 void Cluster::deliverFrame(const EventFrame& e) {
-    LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody());
+    LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody()));
     deliverFrameQueue.push(e);
 }
 
 // Handler for deliverEventQueue.
 // This thread decodes frames from events.
 void Cluster::deliveredEvent(const Event& e) {
-    LATENCY_STAGE(Event, "dequeue event", e.getData());
+    LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
     QPID_LOG(trace, *this << " DLVR: " << e);
     if (e.isCluster()) {
         EventFrame ef(e, e.getFrame());
@@ -329,7 +334,6 @@
     }
     else // Discard connection events if discarding is set.
         QPID_LOG(trace, *this << " DROP: " << e);
-    LATENCY_END(Event, "processed event", e.getData());
 }
 
 void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
@@ -337,18 +341,22 @@
     error.error(connection, type, map.getFrameSeq(), map.getMembers());
 }
 
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
+
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
 void Cluster::deliveredFrame(const EventFrame& e) {
-    LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody());
+    LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
+    LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
     Mutex::ScopedLock l(lock);
     // Process each frame through the error checker.
     error.delivered(e);
     while (error.canProcess())  // There is a frame ready to process.
         processFrame(error.getNext(), l);
-    LATENCY_END(EventFrame, "processed frame", e.frame.getBody());
 }
 
+LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
+
 void Cluster::processFrame(const EventFrame& e, Lock& l) {
     if (e.isCluster()) {
         QPID_LOG(trace, *this << " DLVR: " << e);
@@ -357,6 +365,7 @@
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
     else if (state >= CATCHUP) {
+        LATENCY_TRACK(LatencyScope ls(processLatency));
         map.incrementFrameSeq();
         QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ":  " << e);
         ConnectionPtr connection = getConnection(e.connectionId, l);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue May 19 21:18:52 2009
@@ -31,6 +31,9 @@
 Multicaster::Multicaster(Cpg& cpg_, 
                          const boost::shared_ptr<sys::Poller>& poller,
                          boost::function<void()> onError_) :
+#if defined (QPID_LATENCY_TRACKER)
+    cpgLatency("CPG"),
+#endif
     onError(onError_), cpg(cpg_), 
     queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
     holding(true)
@@ -58,6 +61,7 @@
 void Multicaster::mcast(const Event& e) {
     {
         sys::Mutex::ScopedLock l(lock);
+        LATENCY_TRACK(cpgLatency.start());
         if (e.getType() == DATA && e.isConnection() && holding) {
             holdingQueue.push_back(e); 
             return;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Tue May 19 21:18:52 2009
@@ -26,6 +26,7 @@
 #include "Event.h"
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/LatencyTracker.h"
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
@@ -56,6 +57,8 @@
     /** End holding mode, held events are mcast */
     void release();
     
+    LATENCY_TRACK(sys::LatencyCounter cpgLatency;)
+    
   private:
     typedef sys::PollableQueue<Event> PollableEventQueue;
     typedef std::deque<Event> PlainEventQueue;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue May 19 21:18:52 2009
@@ -24,6 +24,7 @@
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/LatencyTracker.h"
 #include <boost/current_function.hpp>
 
 
@@ -41,7 +42,10 @@
       moreOutput(), doingOutput()
 {}
 
+LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;)
+
 void OutputInterceptor::send(framing::AMQFrame& f) {
+    LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
     parent.getCluster().checkQuorum();
     {
         // FIXME aconway 2009-04-28: locking around next-> may be redundant

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h?rev=776463&r1=776462&r2=776463&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h Tue May 19 21:18:52 2009
@@ -23,82 +23,135 @@
  */
 
 #include "Time.h"
+#include <string>
+#include <limits>
+#include <map>
 
 namespace qpid {
 namespace sys {
 
-/**
- * Record latency between events in the lifecycle of an object.
- * For testing/debugging purposes: use the macros to declare
- * and #define QPID_LATENCY_TRACKER to enable in a build.
+/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS.
+ * Uses should be compiled only if QPID_LATENCY_TRACKER is defined.
+ * See the convenience macros at the end of this file.
  */
-template <class T> class LatencyTracker
-{
-  public:
-    static void start(const char* name, const void* p) {  instance.doStart(name, p); }
-    static void stage(const char* name, const void* p) { instance.doStage(name, p); }
-    static void end(const char* name, const void* p) { instance.doEnd(name, p); }
 
-  private:
-    
-    LatencyTracker() : object(), times(), totals(), count(), names(), index(), maxIndex() { }
-    ~LatencyTracker() { print(); }
+/** Used by LatencyCounter and LatencyTracker below */
+class LatencyStatistic {
+  public:
+    LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {}
+    ~LatencyStatistic() { print(); }
 
-    void doStart(const char* n, const void* p) { if (!object) { name(n); object=p; times[0] = now(); index = 1; } }
-    void doStage(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); } }
-    void doEnd(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); record(); object = 0; } }
-
-    void name(const char* n) {
-        if (names[index] == 0) names[index] = n;
-        assert(names[index] == n);
-    }
- 
-    void record() {
-        if (maxIndex == 0) maxIndex = index;
-        assert(maxIndex == index);
-        for (int i = 0; i < index-1; ++i) 
-            totals[i] += Duration(times[i], times[i+1]);
+    void record(Duration d) {
+        total += d;
         ++count;
+        if (d > max) max=d;
+        if (d < min) min=d;
     }
     
     void print() {
-        printf("\nLatency from %s (%lu samples, %d stages) :\n", names[0], count, maxIndex-1);
-        for (int i = 0; i < maxIndex-1; ++i) 
-            printf("to %s:\t%luus\n", names[i+1], (totals[i]/count)/TIME_USEC);
+        if (count) {
+            double meanMsec =  (double(total)/count)/TIME_MSEC;
+            printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC);
+        }
+        else 
+            printf("\n==== Latency metric %s: no samples.\n", name.c_str());
     }
 
-    static const int SIZE = 1024;
-    const void* object;
-    AbsTime times[SIZE];
-    unsigned long totals[SIZE];
+  private:
+    std::string name;
     unsigned long count;
-    const char* names[SIZE];
-    int index, maxIndex;
+    int64_t total, min, max;
+};
+    
+/** Measure delay between seeing the same value at start and finish. */
+template <class T> class LatencyTracker {
+  public:
+    LatencyTracker(std::string name) : measuring(false), stat(name) {}
 
-    static LatencyTracker instance;
+    void start(T value) {
+        sys::Mutex::ScopedLock l(lock);
+        if (!measuring) {
+            measureAt = value;
+            measuring = true;
+            startTime = AbsTime::now();
+        }
+    }
+    
+    void finish(T value) {
+        sys::Mutex::ScopedLock l(lock);
+        if(measuring && measureAt == value) {
+            stat.record(Duration(startTime, AbsTime::now()));
+            measuring = false;
+        }
+    }
+
+  private:
+    sys::Mutex lock;
+    bool measuring;
+    T measureAt;
+    AbsTime startTime;
+    LatencyStatistic stat;
 };
 
-template <class T> struct LatencyEndOnExit {
-    const char* name;
-    const void* ptr;
-    LatencyEndOnExit(const char* n, const void* p) : name(n), ptr(p) {}
-    ~LatencyEndOnExit() { LatencyTracker<T>::end(name, ptr); }
+
+/** Measures delay between the nth call to start and the nth call to finish.
+ * E.g. to measure latency between sending & receiving an ordered stream of messages.
+ */
+class LatencyCounter {
+  public:
+    LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {}
+
+    void start() {
+        sys::Mutex::ScopedLock l(lock);
+        if (!measuring) {
+            measureAt = startCount;
+            measuring = true;
+            startTime = AbsTime::now();
+        }
+        ++startCount;
+    }
+
+    void finish() {
+        sys::Mutex::ScopedLock l(lock);
+        if (measuring && measureAt == finishCount) {
+            stat.record(Duration(startTime, AbsTime::now()));
+            measuring = false;
+        }
+        ++finishCount;
+    }
+
+  private:
+    sys::Mutex lock;
+    bool measuring;
+    uint64_t startCount, finishCount, measureAt;
+    AbsTime startTime;
+    LatencyStatistic stat;
 };
 
-template <class T> LatencyTracker<T> LatencyTracker<T>::instance;
+/** Measures time spent in a scope. */
+class LatencyScope {
+  public:
+    LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {}
+
+    ~LatencyScope() { 
+        sys::Mutex::ScopedLock l(lock);
+        stat.record(Duration(startTime, AbsTime::now()));
+    }
+
+  private:
+    sys::Mutex lock;
+    LatencyStatistic& stat;
+    AbsTime startTime;
+};
+
+    
+/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */
 
 #if defined(QPID_LATENCY_TRACKER)
-#define LATENCY_START(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::start(NAME, PTR)
-#define LATENCY_STAGE(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::stage(NAME, PTR)
-#define LATENCY_END(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::end(NAME, PTR)
-#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) ::qpid::sys::LatencyEndOnExit<TAG>(NAME, PTR)
+#define LATENCY_TRACK(X) X
 #else
-#define LATENCY_START(TAG, NAME, PTR) void(0)
-#define LATENCY_STAGE(TAG, NAME, PTR) void(0)
-#define LATENCY_END(TAG, NAME, PTR) void(0)
-#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) void(0) 
+#define LATENCY_TRACK(X)
 #endif
-
 }} // namespace qpid::sys
 
 #endif  /*!QPID_SYS_LATENCYTRACKER_H*/



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org