You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/03/09 06:20:04 UTC

svn commit: r920681 - in /incubator/thrift/trunk/lib/cpp/src/concurrency: Mutex.cpp Mutex.h Util.cpp Util.h

Author: dreiss
Date: Tue Mar  9 05:20:04 2010
New Revision: 920681

URL: http://svn.apache.org/viewvc?rev=920681&view=rev
Log:
cpp: Add profiling hooks to Mutex, ReadWriteMutex() classes

Extend the Thrift C++ Concurrency library by allowing a user to register
a callback and a sample rate for lock primitive contention profiling.
The callback will be invoked approximately once every sampleRate calls
to Mutex::lock(), Mutex::timedlock(), ReadWriteLock::acquireRead(), or
ReadWriteLock::acquireWrite().

The callback receives a pointer to the mutex responsible and the time
waited on the lock in micros (whether the lock was successfuly acquire
or not).  The user can then implement a registry of his choice to
log/collect this data as needed.

This can all be easily compiled out if it harms performance.  By
default, there is no profiling callback, so the overhead is minimal
(one branch).

Modified:
    incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.cpp
    incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.h
    incubator/thrift/trunk/lib/cpp/src/concurrency/Util.cpp
    incubator/thrift/trunk/lib/cpp/src/concurrency/Util.h

Modified: incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.cpp?rev=920681&r1=920680&r2=920681&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.cpp Tue Mar  9 05:20:04 2010
@@ -22,11 +22,87 @@
 
 #include <assert.h>
 #include <pthread.h>
+#include <signal.h>
 
 using boost::shared_ptr;
 
 namespace apache { namespace thrift { namespace concurrency {
 
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+static sig_atomic_t mutexProfilingSampleRate = 0;
+static MutexWaitCallback mutexProfilingCallback = 0;
+
+volatile static sig_atomic_t mutexProfilingCounter = 0;
+
+void enableMutexProfiling(int32_t profilingSampleRate,
+                          MutexWaitCallback callback) {
+  mutexProfilingSampleRate = profilingSampleRate;
+  mutexProfilingCallback = callback;
+}
+
+#define PROFILE_MUTEX_START_LOCK() \
+    int64_t _lock_startTime = maybeGetProfilingStartTime();
+
+#define PROFILE_MUTEX_NOT_LOCKED() \
+  do { \
+    if (_lock_startTime > 0) { \
+      int64_t endTime = Util::currentTimeUsec(); \
+      (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
+    } \
+  } while (0)
+
+#define PROFILE_MUTEX_LOCKED() \
+  do { \
+    profileTime_ = _lock_startTime; \
+    if (profileTime_ > 0) { \
+      profileTime_ = Util::currentTimeUsec() - profileTime_; \
+    } \
+  } while (0)
+
+#define PROFILE_MUTEX_START_UNLOCK() \
+  int64_t _temp_profileTime = profileTime_; \
+  profileTime_ = 0;
+
+#define PROFILE_MUTEX_UNLOCKED() \
+  do { \
+    if (_temp_profileTime > 0) { \
+      (*mutexProfilingCallback)(this, _temp_profileTime); \
+    } \
+  } while (0)
+
+static inline int64_t maybeGetProfilingStartTime() {
+  if (mutexProfilingSampleRate && mutexProfilingCallback) {
+    // This block is unsynchronized, but should produce a reasonable sampling
+    // rate on most architectures.  The main race conditions are the gap
+    // between the decrement and the test, the non-atomicity of decrement, and
+    // potential caching of different values at different CPUs.
+    //
+    // - if two decrements race, the likeliest result is that the counter
+    //      decrements slowly (perhaps much more slowly) than intended.
+    //
+    // - many threads could potentially decrement before resetting the counter
+    //      to its large value, causing each additional incoming thread to
+    //      profile every call.  This situation is unlikely to persist for long
+    //      as the critical gap is quite short, but profiling could be bursty.
+    sig_atomic_t localValue = --mutexProfilingCounter;
+    if (localValue <= 0) {
+      mutexProfilingCounter = mutexProfilingSampleRate;
+      return Util::currentTimeUsec();
+    }
+  }
+
+  return 0;
+}
+
+#else
+#  define PROFILE_MUTEX_START_LOCK()
+#  define PROFILE_MUTEX_NOT_LOCKED()
+#  define PROFILE_MUTEX_LOCKED()
+#  define PROFILE_MUTEX_START_UNLOCK()
+#  define PROFILE_MUTEX_UNLOCKED()
+#endif // THRIFT_NO_CONTENTION_PROFILING
+
 /**
  * Implementation of Mutex class using POSIX mutex
  *
@@ -35,6 +111,9 @@ namespace apache { namespace thrift { na
 class Mutex::impl {
  public:
   impl(Initializer init) : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+    profileTime_ = 0;
+#endif
     init(&pthread_mutex_);
     initialized_ = true;
   }
@@ -47,23 +126,43 @@ class Mutex::impl {
     }
   }
 
-  void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+  void lock() const {
+    PROFILE_MUTEX_START_LOCK();
+    pthread_mutex_lock(&pthread_mutex_);
+    PROFILE_MUTEX_LOCKED();
+  }
 
   bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
 
   bool timedlock(int64_t milliseconds) const {
+    PROFILE_MUTEX_START_LOCK();
+
     struct timespec ts;
     Util::toTimespec(ts, milliseconds);
-    return (0 == pthread_mutex_timedlock(&pthread_mutex_, &ts));
+    int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
+    if (ret == 0) {
+      PROFILE_MUTEX_LOCKED();
+      return true;
+    }
+
+    PROFILE_MUTEX_NOT_LOCKED();
+    return false;
   }
 
-  void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+  void unlock() const {
+    PROFILE_MUTEX_START_UNLOCK();
+    pthread_mutex_unlock(&pthread_mutex_);
+    PROFILE_MUTEX_UNLOCKED();
+  }
 
   void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
 
  private:
   mutable pthread_mutex_t pthread_mutex_;
   mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+  mutable int64_t profileTime_;
+#endif
 };
 
 Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
@@ -129,6 +228,9 @@ void Mutex::RECURSIVE_INITIALIZER(void* 
 class ReadWriteMutex::impl {
 public:
   impl() : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+    profileTime_ = 0;
+#endif
     int ret = pthread_rwlock_init(&rw_lock_, NULL);
     assert(ret == 0);
     initialized_ = true;
@@ -142,19 +244,34 @@ public:
     }
   }
 
-  void acquireRead() const { pthread_rwlock_rdlock(&rw_lock_); }
+  void acquireRead() const {
+    PROFILE_MUTEX_START_LOCK();
+    pthread_rwlock_rdlock(&rw_lock_);
+    PROFILE_MUTEX_NOT_LOCKED();  // not exclusive, so use not-locked path
+  }
 
-  void acquireWrite() const { pthread_rwlock_wrlock(&rw_lock_); }
+  void acquireWrite() const {
+    PROFILE_MUTEX_START_LOCK();
+    pthread_rwlock_wrlock(&rw_lock_);
+    PROFILE_MUTEX_LOCKED();
+  }
 
   bool attemptRead() const { return pthread_rwlock_tryrdlock(&rw_lock_); }
 
   bool attemptWrite() const { return pthread_rwlock_trywrlock(&rw_lock_); }
 
-  void release() const { pthread_rwlock_unlock(&rw_lock_); }
+  void release() const {
+    PROFILE_MUTEX_START_UNLOCK();
+    pthread_rwlock_unlock(&rw_lock_);
+    PROFILE_MUTEX_UNLOCKED();
+  }
 
 private:
   mutable pthread_rwlock_t rw_lock_;
   mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+  mutable int64_t profileTime_;
+#endif
 };
 
 ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}

Modified: incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.h?rev=920681&r1=920680&r2=920681&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/concurrency/Mutex.h Tue Mar  9 05:20:04 2010
@@ -24,6 +24,32 @@
 
 namespace apache { namespace thrift { namespace concurrency {
 
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+/**
+ * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to
+ * profile their blocking acquire methods. If this value is set to non-zero,
+ * Thrift will attempt to invoke the callback once every profilingSampleRate
+ * times.  However, as the sampling is not synchronized the rate is not
+ * guranateed, and could be subject to big bursts and swings.  Please ensure
+ * your sampling callback is as performant as your application requires.
+ *
+ * The callback will get called with the wait time taken to lock the mutex in
+ * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex)
+ * being locked.
+ *
+ * The enableMutexProfiling() function is unsynchronized; calling this function
+ * while profiling is already enabled may result in race conditions.  On
+ * architectures where a pointer assignment is atomic, this is safe but there
+ * is no guarantee threads will agree on a single callback within any
+ * particular time period.
+ */
+typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros);
+void enableMutexProfiling(int32_t profilingSampleRate,
+                          MutexWaitCallback callback);
+
+#endif
+
 /**
  * A simple mutex class
  *

Modified: incubator/thrift/trunk/lib/cpp/src/concurrency/Util.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/concurrency/Util.cpp?rev=920681&r1=920680&r2=920681&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/concurrency/Util.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/concurrency/Util.cpp Tue Mar  9 05:20:04 2010
@@ -31,19 +31,19 @@
 
 namespace apache { namespace thrift { namespace concurrency {
 
-const int64_t Util::currentTime() {
+const int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
   int64_t result;
 
 #if defined(HAVE_CLOCK_GETTIME)
   struct timespec now;
   int ret = clock_gettime(CLOCK_REALTIME, &now);
   assert(ret == 0);
-  toMilliseconds(result, now);
+  toTicks(result, now, ticksPerSec);
 #elif defined(HAVE_GETTIMEOFDAY)
   struct timeval now;
   int ret = gettimeofday(&now, NULL);
   assert(ret == 0);
-  toMilliseconds(result, now);
+  toTicks(result, now, ticksPerSec);
 #else
 #error "No high-precision clock is available."
 #endif // defined(HAVE_CLOCK_GETTIME)
@@ -51,5 +51,4 @@ const int64_t Util::currentTime() {
   return result;
 }
 
-
 }}} // apache::thrift::concurrency

Modified: incubator/thrift/trunk/lib/cpp/src/concurrency/Util.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/concurrency/Util.h?rev=920681&r1=920680&r2=920681&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/concurrency/Util.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/concurrency/Util.h Tue Mar  9 05:20:04 2010
@@ -47,6 +47,7 @@ class Util {
   static const int64_t MS_PER_S = 1000LL;
 
   static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
+  static const int64_t NS_PER_US = NS_PER_S / US_PER_S;
   static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
 
  public:
@@ -67,32 +68,78 @@ class Util {
     result.tv_usec = (value % MS_PER_S) * US_PER_MS; // ms to us
   }
 
+  static const void toTicks(int64_t& result, int64_t secs, int64_t oldTicks,
+                            int64_t oldTicksPerSec, int64_t newTicksPerSec) {
+    result = secs * newTicksPerSec;
+    result += oldTicks * newTicksPerSec / oldTicksPerSec;
+
+    int64_t oldPerNew = oldTicksPerSec / newTicksPerSec;
+    if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) {
+      ++result;
+    }
+  }
+  /**
+   * Converts struct timespec to arbitrary-sized ticks since epoch
+   */
+  static const void toTicks(int64_t& result,
+                            const struct timespec& value,
+                            int64_t ticksPerSec) {
+    return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
+  }
+
+  /**
+   * Converts struct timeval to arbitrary-sized ticks since epoch
+   */
+  static const void toTicks(int64_t& result,
+                            const struct timeval& value,
+                            int64_t ticksPerSec) {
+    return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec);
+  }
+
   /**
    * Converts struct timespec to milliseconds
    */
-  static const void toMilliseconds(int64_t& result, const struct timespec& value) {
-    result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS);
-    // round up -- int64_t cast is to avoid a compiler error for some GCCs
-    if (int64_t(value.tv_nsec) % NS_PER_MS >= (NS_PER_MS / 2)) {
-      ++result;
-    }
+  static const void toMilliseconds(int64_t& result,
+                                   const struct timespec& value) {
+    return toTicks(result, value, MS_PER_S);
   }
 
   /**
    * Converts struct timeval to milliseconds
    */
-  static const void toMilliseconds(int64_t& result, const struct timeval& value) {
-    result = (value.tv_sec * MS_PER_S) + (value.tv_usec / US_PER_MS);
-    // round up -- int64_t cast is to avoid a compiler error for some GCCs
-    if (int64_t(value.tv_usec) % US_PER_MS >= (US_PER_MS / 2)) {
-      ++result;
-    }
+  static const void toMilliseconds(int64_t& result,
+                                   const struct timeval& value) {
+    return toTicks(result, value, MS_PER_S);
+  }
+
+  /**
+   * Converts struct timespec to microseconds
+   */
+  static const void toUsec(int64_t& result, const struct timespec& value) {
+    return toTicks(result, value, US_PER_S);
   }
 
   /**
+   * Converts struct timeval to microseconds
+   */
+  static const void toUsec(int64_t& result, const struct timeval& value) {
+    return toTicks(result, value, US_PER_S);
+  }
+
+  /**
+   * Get current time as a number of arbitrary-size ticks from epoch
+   */
+  static const int64_t currentTimeTicks(int64_t ticksPerSec);
+
+  /**
    * Get current time as milliseconds from epoch
    */
-  static const int64_t currentTime();
+  static const int64_t currentTime() { return currentTimeTicks(MS_PER_S); }
+
+  /**
+   * Get current time as micros from epoch
+   */
+  static const int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); }
 };
 
 }}} // apache::thrift::concurrency