You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2012/05/24 22:45:34 UTC

git commit: TS-1240: fix race in log buffer queuing code.

Updated Branches:
  refs/heads/master 71d8eab9b -> 3505de43f


TS-1240: fix race in log buffer queuing code.


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3505de43
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3505de43
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3505de43

Branch: refs/heads/master
Commit: 3505de43f5615c0f5a7ae6b62d0b0673257261c5
Parents: 71d8eab
Author: jplevyak@apache.org <jp...@apache.org>
Authored: Thu May 24 10:32:44 2012 -0700
Committer: John Plevyak <jp...@acm.org>
Committed: Thu May 24 13:45:26 2012 -0700

----------------------------------------------------------------------
 lib/ts/ink_queue.cc        |   15 -----
 lib/ts/ink_queue.h         |    6 ++
 proxy/logging/LogBuffer.cc |   10 ++--
 proxy/logging/LogBuffer.h  |   44 +++++++--------
 proxy/logging/LogObject.cc |  113 ++++++++++++++++++++++++---------------
 proxy/logging/LogObject.h  |   39 ++++----------
 6 files changed, 111 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/lib/ts/ink_queue.cc
----------------------------------------------------------------------
diff --git a/lib/ts/ink_queue.cc b/lib/ts/ink_queue.cc
index 4c8697b..e9cc992 100644
--- a/lib/ts/ink_queue.cc
+++ b/lib/ts/ink_queue.cc
@@ -50,12 +50,6 @@
 #include "ink_resource.h"
 
 
-#ifdef __x86_64__
-#define INK_QUEUE_LD64(dst,src) *((uint64_t*)&(dst)) = *((uint64_t*)&(src))
-#else
-#define INK_QUEUE_LD64(dst,src) (ink_queue_load_64((void *)&(dst), (void *)&(src)))
-#endif
-
 typedef struct _ink_freelist_list
 {
   InkFreeList *fl;
@@ -92,15 +86,6 @@ inkcoreapi volatile int64_t freelist_allocated_mem = 0;
 #define fl_memadd(_x_) \
    ink_atomic_increment64(&freelist_allocated_mem, (int64_t) (_x_));
 
-//static void ink_queue_load_64(void *dst, void *src)
-//{
-//    int32_t src_version =  (*(head_p *) src).s.version;
-//    void *src_pointer = (*(head_p *) src).s.pointer;
-//
-//    (*(head_p *) dst).s.version = src_version;
-//    (*(head_p *) dst).s.pointer = src_pointer;
-//}
-
 
 void
 ink_freelist_init(InkFreeList * f,

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/lib/ts/ink_queue.h
----------------------------------------------------------------------
diff --git a/lib/ts/ink_queue.h b/lib/ts/ink_queue.h
index e1aebbc..1176b95 100644
--- a/lib/ts/ink_queue.h
+++ b/lib/ts/ink_queue.h
@@ -65,6 +65,12 @@ extern "C"
 
   void ink_queue_load_64(void *dst, void *src);
 
+#ifdef __x86_64__
+#define INK_QUEUE_LD64(dst,src) *((uint64_t*)&(dst)) = *((uint64_t*)&(src))
+#else
+#define INK_QUEUE_LD64(dst,src) (ink_queue_load_64((void *)&(dst), (void *)&(src)))
+#endif
+
 /*
  * Generic Free List Manager
  */

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogBuffer.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogBuffer.cc b/proxy/logging/LogBuffer.cc
index 363476d..937247b 100644
--- a/proxy/logging/LogBuffer.cc
+++ b/proxy/logging/LogBuffer.cc
@@ -129,11 +129,10 @@ LogBufferHeader::log_filename()
   -------------------------------------------------------------------------*/
 
 LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t write_align):
-  next_flush(NULL),
-  next_list(NULL),
   m_size(size),
   m_buf_align(buf_align),
-  m_write_align(write_align), m_max_entries(Log::config->max_entries_per_buffer), m_owner(owner)
+  m_write_align(write_align), m_max_entries(Log::config->max_entries_per_buffer), m_owner(owner),
+  m_references(0)
 {
   size_t hdr_size;
 
@@ -159,13 +158,12 @@ LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t wr
 }
 
 LogBuffer::LogBuffer(LogObject * owner, LogBufferHeader * header):
-  next_flush(NULL),
-  next_list(NULL),
   m_unaligned_buffer(NULL),
   m_buffer((char *) header),
   m_size(0),
   m_buf_align(LB_DEFAULT_ALIGN),
-  m_write_align(INK_MIN_ALIGN), m_max_entries(0), m_expiration_time(0), m_owner(owner), m_header(header)
+  m_write_align(INK_MIN_ALIGN), m_max_entries(0), m_expiration_time(0), m_owner(owner), m_header(header),
+  m_references(0)
 {
   // This constructor does not allocate a buffer because it gets it as
   // an argument. We set m_unaligned_buffer to NULL, which means that

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogBuffer.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogBuffer.h b/proxy/logging/LogBuffer.h
index 8662975..279e69c 100644
--- a/proxy/logging/LogBuffer.h
+++ b/proxy/logging/LogBuffer.h
@@ -130,17 +130,11 @@ union LB_State
 /*-------------------------------------------------------------------------
   LogBuffer
   -------------------------------------------------------------------------*/
-#define CLASS_SIGN_LOGBUFFER 0xFACE5370 /* LogBuffer class signature */
-
 class LogBuffer
 {
 public:
-  unsigned long sign;           /* class signature (must be CLASS_SIGN_LOGBUFFER) */
-  LogBuffer *next_flush;        /* next in flush list */
-  LogBuffer *next_list;         /* next in list */
-
-  enum LB_ResultCode
-  {
+  SLINK(LogBuffer, write_link);
+  enum LB_ResultCode {
     LB_OK = 0,
     LB_FULL_NO_WRITERS,
     LB_FULL_ACTIVE_WRITERS,
@@ -199,14 +193,16 @@ public:
 
   // static functions
   static size_t max_entry_bytes();
-  static int to_ascii(LogEntryHeader * entry, LogFormatType type,
-                      char *buf, int max_len, char *symbol_str, char *printf_str,
-                      unsigned buffer_version, char *alt_format = NULL);
-  static int resolve_custom_entry(LogFieldList * fieldlist,
-                                  char *printf_str, char *read_from, char *write_to,
-                                  int write_to_len, long timestamp, long timestamp_us,
-                                  unsigned buffer_version, LogFieldList * alt_fieldlist = NULL,
-                                  char *alt_printf_str = NULL);
+  static int to_ascii(
+      LogEntryHeader * entry, LogFormatType type,
+      char *buf, int max_len, char *symbol_str, char *printf_str,
+      unsigned buffer_version, char *alt_format = NULL);
+  static int resolve_custom_entry(
+      LogFieldList * fieldlist,
+      char *printf_str, char *read_from, char *write_to,
+      int write_to_len, long timestamp, long timestamp_us,
+      unsigned buffer_version, LogFieldList * alt_fieldlist = NULL,
+      char *alt_printf_str = NULL);
 
 private:
   char *m_unaligned_buffer;     // the unaligned buffer
@@ -223,7 +219,10 @@ private:
   LogObject *m_owner;           // the LogObject that owns this buf.
   LogBufferHeader *m_header;
 
-  uint32_t m_id;                  // unique buffer id (for debugging)
+  uint32_t m_id;                // unique buffer id (for debugging)
+public:
+  volatile int m_references;    // oustanding checkout_write references.
+private:
 
   // private functions
   size_t _add_buffer_header();
@@ -270,8 +269,7 @@ public:
   This class will iterate over the entries in a LogBuffer.
   -------------------------------------------------------------------------*/
 
-class LogBufferIterator
-{
+class LogBufferIterator {
 public:
   LogBufferIterator(LogBufferHeader * header, bool in_network_order = false);
   ~LogBufferIterator();
@@ -300,10 +298,10 @@ private:
 
 inline
 LogBufferIterator::LogBufferIterator(LogBufferHeader * header, bool in_network_order)
-                  : m_in_network_order(in_network_order),
-                  m_next(0),
-                  m_iter_entry_count(0),
-                  m_buffer_entry_count(0)
+: m_in_network_order(in_network_order),
+  m_next(0),
+  m_iter_entry_count(0),
+  m_buffer_entry_count(0)
 {
   ink_debug_assert(header);
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogObject.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
index 24af0af..2e21f7b 100644
--- a/proxy/logging/LogObject.cc
+++ b/proxy/logging/LogObject.cc
@@ -40,34 +40,32 @@
 #include "LogObject.h"
 
 size_t
-LogBufferManager::flush_buffers(LogBufferSink *sink)
-{
-  while (!ink_atomic_cas(&_flush_array_lock, 0, 1));
-
-  int ofa = _open_flush_array;
-  int nfb = _num_flush_buffers[ofa];
-  if (nfb) {
-    _open_flush_array = !_open_flush_array;        // switch to other array
-    _num_flush_buffers[_open_flush_array] = 0;     // clear count
-  }
-
-  _flush_array_lock = 0;
-
-  if (nfb) {
-    int i;
-
-    for (i=0 ;i < nfb; ++i) {
-      LogBuffer *flush_buffer = _flush_array[ofa][i];
-
-      flush_buffer->update_header_data();
-      sink->write(flush_buffer);
-      delete flush_buffer;
+LogBufferManager::flush_buffers(LogBufferSink *sink) {
+  SList(LogBuffer, write_link) q(write_list.popall()), new_q;
+  LogBuffer *b = NULL;
+  while ((b = q.pop())) {
+    if (b->m_references) {  // Still has outstanding references.
+      write_list.push(b);
+    } else if (_num_flush_buffers > FLUSH_ARRAY_SIZE) {
+      delete b;
+      ink_atomic_increment(&_num_flush_buffers, -1);
+      Warning("Dropping log buffer, can't keep up.");
+    } else {
+      new_q.push(b);
     }
+  }
 
-    Debug("log-logbuffer", "flushed %d buffers from array %d", nfb, ofa);
+  int flushed = 0;
+  while ((b = new_q.pop())) {
+    b->update_header_data();
+    sink->write(b);
+    delete b;
+    ink_atomic_increment(&_num_flush_buffers, -1);
+    flushed++;
   }
 
-  return nfb;
+  Debug("log-logbuffer", "flushed %d buffers", flushed);
+  return flushed;
 }
 
 /*-------------------------------------------------------------------------
@@ -85,8 +83,7 @@ LogObject::LogObject(LogFormat *format, const char *log_dir,
       m_rolling_offset_hr (rolling_offset_hr),
       m_rolling_size_mb (rolling_size_mb),
       m_last_roll_time(0),
-      m_ref_count (0),
-      m_log_buffer (NULL)
+      m_ref_count (0)
 {
     ink_debug_assert (format != NULL);
     m_format = new LogFormat(*format);
@@ -120,8 +117,9 @@ LogObject::LogObject(LogFormat *format, const char *log_dir,
                                  Log::config->overspill_report_count));
 #endif // TS_MICRO
 
-    m_log_buffer = NEW (new LogBuffer (this, Log::config->log_buffer_size));
-    ink_debug_assert (m_log_buffer != NULL);
+    LogBuffer *b = NEW (new LogBuffer (this, Log::config->log_buffer_size));
+    ink_debug_assert(b);
+    SET_FREELIST_POINTER_VERSION(m_log_buffer, b, 0);
 
     _setup_rolling(rolling_enabled, rolling_interval_sec, rolling_offset_hr, rolling_size_mb);
 
@@ -137,8 +135,7 @@ LogObject::LogObject(LogObject& rhs)
     m_signature(rhs.m_signature),
     m_rolling_interval_sec(rhs.m_rolling_interval_sec),
     m_last_roll_time(rhs.m_last_roll_time),
-    m_ref_count(0),
-    m_log_buffer(NULL)
+    m_ref_count(0)
 {
     m_format = new LogFormat(*(rhs.m_format));
 
@@ -166,8 +163,9 @@ LogObject::LogObject(LogObject& rhs)
 
     // copy gets a fresh log buffer
     //
-    m_log_buffer = NEW (new LogBuffer (this, Log::config->log_buffer_size));
-    ink_debug_assert (m_log_buffer != NULL);
+    LogBuffer *b = NEW (new LogBuffer (this, Log::config->log_buffer_size));
+    ink_debug_assert(b);
+    SET_FREELIST_POINTER_VERSION(m_log_buffer, b, 0);
 
     Debug("log-config", "exiting LogObject copy constructor, "
           "filename=%s this=%p", m_filename, this);
@@ -194,7 +192,7 @@ LogObject::~LogObject()
   ats_free(m_filename);
   ats_free(m_alt_filename);
   delete m_format;
-  delete m_log_buffer;
+  delete (LogBuffer*)FREELIST_POINTER(m_log_buffer);
 }
 
 //-----------------------------------------------------------------------------
@@ -405,16 +403,26 @@ LogObject::displayAsXML(FILE * fd, bool extended)
 
 
 LogBuffer *
-LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
-{
+LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) {
   LogBuffer::LB_ResultCode result_code;
   LogBuffer *buffer;
   LogBuffer *new_buffer;
   bool retry = true;
 
   do {
-    buffer = m_log_buffer;
+    // To avoid a race condition, we keep a count of held references in
+    // the pointer itself and add this to m_outstanding_references.
+    head_p h;
+    int result = 0;
+    do {
+      INK_QUEUE_LD64(h, m_log_buffer);
+      head_p new_h;
+      SET_FREELIST_POINTER_VERSION(new_h, FREELIST_POINTER(h), FREELIST_VERSION(h) + 1);
+      result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, h.data, new_h.data);
+    } while (!result);
+    buffer = (LogBuffer*)FREELIST_POINTER(h);
     result_code = buffer->checkout_write(write_offset, bytes_needed);
+    bool decremented = false;
 
     switch (result_code) {
     case LogBuffer::LB_OK:
@@ -428,11 +436,17 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
       // no more room in current buffer, create a new one
       new_buffer = NEW (new LogBuffer(this, Log::config->log_buffer_size));
 
-      // swap the new buffer for the old one (only this thread
-      // should be doing this, so there should be no problem)
-      //
+      // swap the new buffer for the old one
       INK_WRITE_MEMORY_BARRIER;
-      ink_atomic_swap_ptr((void *)&m_log_buffer, new_buffer);
+      head_p old_h;
+      do {
+        INK_QUEUE_LD64(old_h, m_log_buffer);
+        head_p tmp_h;
+        SET_FREELIST_POINTER_VERSION(tmp_h, new_buffer, 0);
+        result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, old_h.data, tmp_h.data);
+      } while (!result);
+      if (FREELIST_POINTER(old_h) == FREELIST_POINTER(h))
+        ink_atomic_increment(&buffer->m_references, FREELIST_VERSION(old_h) - 1);
 
       if (result_code == LogBuffer::LB_FULL_NO_WRITERS) {
         // there are no writers, move the old buffer to the flush list
@@ -440,7 +454,8 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
         m_buffer_manager.add_to_flush_queue(buffer);
         ink_cond_signal(&Log::flush_cond);
       }
-      // fallover to retry
+      decremented = true;
+      break;
 
     case LogBuffer::LB_RETRY:
       // no more room, but another thread should be taking care of
@@ -460,7 +475,19 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
     default:
       ink_debug_assert(false);
     }
-
+    if (!decremented) {
+      head_p old_h;
+      do {
+        INK_QUEUE_LD64(old_h, m_log_buffer);
+        if (FREELIST_POINTER(old_h) != FREELIST_POINTER(h))
+          break;
+        head_p tmp_h;
+        SET_FREELIST_POINTER_VERSION(tmp_h, FREELIST_POINTER(h), FREELIST_VERSION(old_h) - 1);
+        result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, old_h.data, tmp_h.data);
+      } while (!result);
+      if (FREELIST_POINTER(old_h) != FREELIST_POINTER(h))
+        ink_atomic_increment(&buffer->m_references, -1);
+    }
   } while (retry && write_offset);      // if write_offset is null, we do
   // not retry because we really do
   // not want to write to the buffer
@@ -750,7 +777,7 @@ LogObject::_roll_files(long last_roll_time, long time_now)
 void
 LogObject::check_buffer_expiration(long time_now)
 {
-  LogBuffer *b = m_log_buffer;
+  LogBuffer *b = (LogBuffer*)FREELIST_POINTER(m_log_buffer);
   if (b && time_now > b->expiration_time()) {
     force_new_buffer();
   }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogObject.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogObject.h b/proxy/logging/LogObject.h
index 1b3232a..b163696 100644
--- a/proxy/logging/LogObject.h
+++ b/proxy/logging/LogObject.h
@@ -67,40 +67,21 @@ Debug("log-api-mutex", _f)
 
 class LogBufferManager
 {
-private:
-  int _flush_array_lock;
-
-  LogBuffer *_flush_array[2][FLUSH_ARRAY_SIZE];
-
-  int _num_flush_buffers[2];          // number of buffers in queue
-  int _open_flush_array;              // index of queue accepting buffers
-
-public:
- LogBufferManager()
-   : _flush_array_lock(0), _open_flush_array(0)
-  {
-    ink_zero(_num_flush_buffers);
-  }
-
-  void add_to_flush_queue(LogBuffer * buffer)
-  {
-    while (!ink_atomic_cas(&_flush_array_lock, 0, 1));
+  private:
+    ASLL(LogBuffer, write_link) write_list;
+    int _num_flush_buffers;
 
-    if (_num_flush_buffers[_open_flush_array] < FLUSH_ARRAY_SIZE) {
-      int idx = _num_flush_buffers[_open_flush_array]++;
+  public:
+    LogBufferManager() : _num_flush_buffers(0) { }
 
-      _flush_array[_open_flush_array][idx] = buffer;
-    } else {
-      Warning("Dropping log buffer, can't keep up");
-      delete buffer;
+    void add_to_flush_queue(LogBuffer *buffer) {
+    write_list.push(buffer);
+    ink_atomic_increment(&_num_flush_buffers, 1);
     }
-    _flush_array_lock = 0;
-  }
 
-  size_t flush_buffers(LogBufferSink *sink);
+    size_t flush_buffers(LogBufferSink *sink);
 };
 
-
 class LogObject
 {
 public:
@@ -235,7 +216,7 @@ private:
 
   int m_ref_count;
 
-  LogBuffer *volatile m_log_buffer;     // current work buffer
+  volatile head_p m_log_buffer;     // current work buffer
   LogBufferManager m_buffer_manager;
 
   void generate_filenames(const char *log_dir, const char *basename, LogFileFormat file_format);