You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/20 23:11:37 UTC

svn commit: r736135 - in /qpid/trunk/qpid/cpp/src: Makefile.am qpid/cluster/Cluster.cpp qpid/cluster/Event.cpp qpid/cluster/Event.h qpid/cluster/Multicaster.cpp qpid/cluster/Multicaster.h qpid/sys/LatencyMetric.cpp qpid/sys/LatencyMetric.h

Author: aconway
Date: Tue Jan 20 14:11:37 2009
New Revision: 736135

URL: http://svn.apache.org/viewvc?rev=736135&view=rev
Log:
Latency measurements, compiled out of production code.

Added:
    qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=736135&r1=736134&r2=736135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Jan 20 14:11:37 2009
@@ -349,6 +349,8 @@
   qpid/sys/AsynchIOHandler.cpp \
   qpid/sys/Dispatcher.cpp \
   qpid/sys/DispatchHandle.cpp \
+  qpid/sys/LatencyMetric.cpp \
+  qpid/sys/LatencyMetric.h \
   qpid/sys/Runnable.cpp \
   qpid/sys/Shlib.cpp \
   qpid/sys/Timer.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=736135&r1=736134&r2=736135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jan 20 14:11:37 2009
@@ -38,6 +38,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/log/Helpers.h"
 #include "qpid/sys/Thread.h"
+#include "qpid/sys/LatencyMetric.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
 #include "qmf/org/apache/qpid/cluster/Package.h"
@@ -182,7 +183,7 @@
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
-    if (from == myId) // Record self-deliveries for flow control.
+    if (from == myId)  // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
     deliver(e, l);
 }
@@ -206,6 +207,7 @@
 }
 
 void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
+    QPID_LATENCY_RECORD("deliver queue", e);
     Buffer buf(const_cast<char*>(data), e.getSize());
     AMQFrame frame;
     if (e.isCluster())  {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=736135&r1=736134&r2=736135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Jan 20 14:11:37 2009
@@ -35,16 +35,21 @@
 const size_t EventHeader::HEADER_SIZE =
     sizeof(uint8_t) +  // type
     sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
-    sizeof(uint32_t);  // payload size
+    sizeof(uint32_t)  // payload size
+#ifdef QPID_LATENCY_METRIC
+    + sizeof(int64_t)           // timestamp
+#endif
+    ;
 
 EventHeader::EventHeader(EventType t, const ConnectionId& c,  size_t s)
     : type(t), connectionId(c), size(s) {}
 
+
+Event::Event() {}
+
 Event::Event(EventType t, const ConnectionId& c,  size_t s)
     : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE))
-{
-    encodeHeader();
-}
+{}
 
 void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
     if (buf.available() <= HEADER_SIZE)
@@ -54,14 +59,17 @@
         throw ClusterLeaveException("Invalid multicast event type");
     connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
     size = buf.getLong();
+#ifdef QPID_LATENCY_METRIC
+    latency_metric_timestamp = buf.getLongLong();
+#endif
 }
 
 Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
-    EventHeader h;
-    h.decode(m, buf);           // Header
-    Event e(h.getType(), h.getConnectionId(), h.getSize());
+    Event e;
+    e.decode(m, buf);           // Header
     if (buf.available() < e.size)
         throw ClusterLeaveException("Not enough data for multicast event");
+    e.store = RefCountedBuffer::create(e.size + HEADER_SIZE);
     memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
     return e;
 }
@@ -73,11 +81,20 @@
     f.encode(buf);
     return e;
 }
-    
+
+iovec Event::toIovec() {
+    encodeHeader();
+    iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
+    return iov;
+}
+
 void EventHeader::encode(Buffer& b) const {
     b.putOctet(type);
     b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
     b.putLong(size);
+#ifdef QPID_LATENCY_METRIC
+    b.putLongLong(latency_metric_timestamp);
+#endif
 }
 
 // Encode my header in my buffer.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=736135&r1=736134&r2=736135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Jan 20 14:11:37 2009
@@ -27,6 +27,8 @@
 #include "Connection.h"
 #include "qpid/RefCountedBuffer.h"
 #include "qpid/framing/Buffer.h"
+#include "qpid/sys/LatencyMetric.h"
+#include <sys/uio.h>            // For iovec
 #include <iosfwd>
 
 namespace qpid {
@@ -37,7 +39,7 @@
 // 
 
 /** Header data for a multicast event */
-class EventHeader {
+class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
   public:
     EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
     void decode(const MemberId& m, framing::Buffer&);
@@ -65,8 +67,9 @@
  */
 class Event : public EventHeader {
   public:
+    Event();
     /** Create an event with a buffer that can hold size bytes plus an event header. */
-    Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
+    Event(EventType t, const ConnectionId& c, size_t);
 
     /** Create an event copied from delivered data. */
     static Event decodeCopy(const MemberId& m, framing::Buffer&);
@@ -85,6 +88,8 @@
     
     operator framing::Buffer() const;
 
+    iovec toIovec();
+    
   private:
     void encodeHeader();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=736135&r1=736134&r2=736135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Jan 20 14:11:37 2009
@@ -23,7 +23,7 @@
 #include "Cpg.h"
 #include "ClusterLeaveException.h"
 #include "qpid/log/Statement.h"
-
+#include "qpid/sys/LatencyMetric.h"
 
 namespace qpid {
 namespace cluster {
@@ -59,8 +59,8 @@
             return;
         }
     }
+    QPID_LATENCY_INIT(e);
     queue.push(e);
-
 }
 
 
@@ -76,7 +76,8 @@
                 }
                 ++pending;
             }
-            iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() };
+            QPID_LATENCY_RECORD("mcast send queue", *i);
+            iovec iov = i->toIovec();
             if (!cpg.mcast(&iov, 1)) {
                 // cpg didn't send because of CPG flow control.
                 if (mcastMax) {
@@ -104,8 +105,9 @@
     holdingQueue.clear();
 }
 
-void Multicaster::selfDeliver(const Event&) {
+void Multicaster::selfDeliver(const Event& e) {
     sys::Mutex::ScopedLock l(lock);
+    QPID_LATENCY_RECORD("cpg self deliver", e);
     if (mcastMax) {
         assert(pending > 0);
         assert(pending <= mcastMax);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=736135&r1=736134&r2=736135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Jan 20 14:11:37 2009
@@ -27,7 +27,6 @@
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/sys/Mutex.h"
 #include <boost/shared_ptr.hpp>
-#include <sys/uio.h>            // For iovec
 
 namespace qpid {
 

Added: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp?rev=736135&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp Tue Jan 20 14:11:37 2009
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef QPID_LATENCY_METRIC
+
+#include "LatencyMetric.h"
+#include "Time.h"
+#include <iostream>
+
+namespace qpid {
+namespace sys {
+
+void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) {
+    const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now());
+}
+
+LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) : 
+    message(msg), count(0), total(0), skipped(0), skip(skip_)
+{}
+
+LatencyMetric::~LatencyMetric() { report(); }
+    
+void LatencyMetric::record(const LatencyMetricTimestamp& start) {
+    Mutex::ScopedLock l(lock);  // FIXME aconway 2009-01-20: atomics?
+    if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps.
+    if (skip) {
+        if (++skipped < skip) return;
+        else skipped = 0;
+    }
+    ++count;
+    int64_t now_ = Duration(now());
+    total += now_ - start.latency_metric_timestamp;
+    // Set start time for next leg of the journey 
+    const_cast<int64_t&>(start.latency_metric_timestamp) = now_; 
+}
+
+void LatencyMetric::report() {
+    using namespace std;
+    if (count) {
+        cout << "LATENCY: " << message << ": "
+             << total / (count * TIME_USEC) << " microseconds" << endl;
+    }
+    else {
+        cout << "LATENCY: " << message << ": no data." << endl;
+    }
+    count = 0;
+    total = 0; 
+}
+
+
+}} // namespace qpid::sys
+
+#endif 

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h?rev=736135&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h Tue Jan 20 14:11:37 2009
@@ -0,0 +1,80 @@
+#ifndef QPID_SYS_LATENCYMETRIC_H
+#define QPID_SYS_LATENCYMETRIC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef QPID_LATENCY_METRIC
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace sys {
+
+/** Use this base class to add a timestamp for latency to an object */
+struct LatencyMetricTimestamp {
+    LatencyMetricTimestamp() : latency_metric_timestamp(0) {}
+    static void initialize(const LatencyMetricTimestamp&);
+    int64_t latency_metric_timestamp;
+};
+    
+/**
+ * Record average latencies, report on destruction.
+ *
+ * For debugging only, use via macros below so it can be compiled out
+ * of production code.
+ */
+class LatencyMetric {
+  public:
+    /** msg should be a string literal. */
+    LatencyMetric(const char* msg, int64_t skip_=0);
+    ~LatencyMetric();
+    
+    void record(const LatencyMetricTimestamp& start);
+
+  private:
+    void report();
+    Mutex lock;
+    const char* message;
+    int64_t ignore, count, total, skipped, skip;
+};
+
+}} // namespace qpid::sys
+
+#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x)
+#define QPID_LATENCY_RECORD(msg, x) do {                                 \
+        static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \
+    } while (false)
+
+
+#else  /* defined QPID_LATENCY_METRIC */
+
+namespace qpid { namespace sys {
+class LatencyMetricTimestamp {};
+}}
+
+#define QPID_LATENCY_INIT(x) (void)x
+#define QPID_LATENCY_RECORD(msg, x) (void)x
+
+#endif /* defined QPID_LATENCY_METRIC */
+
+#endif  /*!QPID_SYS_LATENCYMETRIC_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyMetric.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org