You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2012/05/24 22:45:34 UTC
git commit: TS-1240: fix race in log buffer queuing code.
Updated Branches:
refs/heads/master 71d8eab9b -> 3505de43f
TS-1240: fix race in log buffer queuing code.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3505de43
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3505de43
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3505de43
Branch: refs/heads/master
Commit: 3505de43f5615c0f5a7ae6b62d0b0673257261c5
Parents: 71d8eab
Author: jplevyak@apache.org <jp...@apache.org>
Authored: Thu May 24 10:32:44 2012 -0700
Committer: John Plevyak <jp...@acm.org>
Committed: Thu May 24 13:45:26 2012 -0700
----------------------------------------------------------------------
lib/ts/ink_queue.cc | 15 -----
lib/ts/ink_queue.h | 6 ++
proxy/logging/LogBuffer.cc | 10 ++--
proxy/logging/LogBuffer.h | 44 +++++++--------
proxy/logging/LogObject.cc | 113 ++++++++++++++++++++++++---------------
proxy/logging/LogObject.h | 39 ++++----------
6 files changed, 111 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/lib/ts/ink_queue.cc
----------------------------------------------------------------------
diff --git a/lib/ts/ink_queue.cc b/lib/ts/ink_queue.cc
index 4c8697b..e9cc992 100644
--- a/lib/ts/ink_queue.cc
+++ b/lib/ts/ink_queue.cc
@@ -50,12 +50,6 @@
#include "ink_resource.h"
-#ifdef __x86_64__
-#define INK_QUEUE_LD64(dst,src) *((uint64_t*)&(dst)) = *((uint64_t*)&(src))
-#else
-#define INK_QUEUE_LD64(dst,src) (ink_queue_load_64((void *)&(dst), (void *)&(src)))
-#endif
-
typedef struct _ink_freelist_list
{
InkFreeList *fl;
@@ -92,15 +86,6 @@ inkcoreapi volatile int64_t freelist_allocated_mem = 0;
#define fl_memadd(_x_) \
ink_atomic_increment64(&freelist_allocated_mem, (int64_t) (_x_));
-//static void ink_queue_load_64(void *dst, void *src)
-//{
-// int32_t src_version = (*(head_p *) src).s.version;
-// void *src_pointer = (*(head_p *) src).s.pointer;
-//
-// (*(head_p *) dst).s.version = src_version;
-// (*(head_p *) dst).s.pointer = src_pointer;
-//}
-
void
ink_freelist_init(InkFreeList * f,
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/lib/ts/ink_queue.h
----------------------------------------------------------------------
diff --git a/lib/ts/ink_queue.h b/lib/ts/ink_queue.h
index e1aebbc..1176b95 100644
--- a/lib/ts/ink_queue.h
+++ b/lib/ts/ink_queue.h
@@ -65,6 +65,12 @@ extern "C"
void ink_queue_load_64(void *dst, void *src);
+#ifdef __x86_64__
+#define INK_QUEUE_LD64(dst,src) *((uint64_t*)&(dst)) = *((uint64_t*)&(src))
+#else
+#define INK_QUEUE_LD64(dst,src) (ink_queue_load_64((void *)&(dst), (void *)&(src)))
+#endif
+
/*
* Generic Free List Manager
*/
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogBuffer.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogBuffer.cc b/proxy/logging/LogBuffer.cc
index 363476d..937247b 100644
--- a/proxy/logging/LogBuffer.cc
+++ b/proxy/logging/LogBuffer.cc
@@ -129,11 +129,10 @@ LogBufferHeader::log_filename()
-------------------------------------------------------------------------*/
LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t write_align):
- next_flush(NULL),
- next_list(NULL),
m_size(size),
m_buf_align(buf_align),
- m_write_align(write_align), m_max_entries(Log::config->max_entries_per_buffer), m_owner(owner)
+ m_write_align(write_align), m_max_entries(Log::config->max_entries_per_buffer), m_owner(owner),
+ m_references(0)
{
size_t hdr_size;
@@ -159,13 +158,12 @@ LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t wr
}
LogBuffer::LogBuffer(LogObject * owner, LogBufferHeader * header):
- next_flush(NULL),
- next_list(NULL),
m_unaligned_buffer(NULL),
m_buffer((char *) header),
m_size(0),
m_buf_align(LB_DEFAULT_ALIGN),
- m_write_align(INK_MIN_ALIGN), m_max_entries(0), m_expiration_time(0), m_owner(owner), m_header(header)
+ m_write_align(INK_MIN_ALIGN), m_max_entries(0), m_expiration_time(0), m_owner(owner), m_header(header),
+ m_references(0)
{
// This constructor does not allocate a buffer because it gets it as
// an argument. We set m_unaligned_buffer to NULL, which means that
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogBuffer.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogBuffer.h b/proxy/logging/LogBuffer.h
index 8662975..279e69c 100644
--- a/proxy/logging/LogBuffer.h
+++ b/proxy/logging/LogBuffer.h
@@ -130,17 +130,11 @@ union LB_State
/*-------------------------------------------------------------------------
LogBuffer
-------------------------------------------------------------------------*/
-#define CLASS_SIGN_LOGBUFFER 0xFACE5370 /* LogBuffer class signature */
-
class LogBuffer
{
public:
- unsigned long sign; /* class signature (must be CLASS_SIGN_LOGBUFFER) */
- LogBuffer *next_flush; /* next in flush list */
- LogBuffer *next_list; /* next in list */
-
- enum LB_ResultCode
- {
+ SLINK(LogBuffer, write_link);
+ enum LB_ResultCode {
LB_OK = 0,
LB_FULL_NO_WRITERS,
LB_FULL_ACTIVE_WRITERS,
@@ -199,14 +193,16 @@ public:
// static functions
static size_t max_entry_bytes();
- static int to_ascii(LogEntryHeader * entry, LogFormatType type,
- char *buf, int max_len, char *symbol_str, char *printf_str,
- unsigned buffer_version, char *alt_format = NULL);
- static int resolve_custom_entry(LogFieldList * fieldlist,
- char *printf_str, char *read_from, char *write_to,
- int write_to_len, long timestamp, long timestamp_us,
- unsigned buffer_version, LogFieldList * alt_fieldlist = NULL,
- char *alt_printf_str = NULL);
+ static int to_ascii(
+ LogEntryHeader * entry, LogFormatType type,
+ char *buf, int max_len, char *symbol_str, char *printf_str,
+ unsigned buffer_version, char *alt_format = NULL);
+ static int resolve_custom_entry(
+ LogFieldList * fieldlist,
+ char *printf_str, char *read_from, char *write_to,
+ int write_to_len, long timestamp, long timestamp_us,
+ unsigned buffer_version, LogFieldList * alt_fieldlist = NULL,
+ char *alt_printf_str = NULL);
private:
char *m_unaligned_buffer; // the unaligned buffer
@@ -223,7 +219,10 @@ private:
LogObject *m_owner; // the LogObject that owns this buf.
LogBufferHeader *m_header;
- uint32_t m_id; // unique buffer id (for debugging)
+ uint32_t m_id; // unique buffer id (for debugging)
+public:
+ volatile int m_references; // oustanding checkout_write references.
+private:
// private functions
size_t _add_buffer_header();
@@ -270,8 +269,7 @@ public:
This class will iterate over the entries in a LogBuffer.
-------------------------------------------------------------------------*/
-class LogBufferIterator
-{
+class LogBufferIterator {
public:
LogBufferIterator(LogBufferHeader * header, bool in_network_order = false);
~LogBufferIterator();
@@ -300,10 +298,10 @@ private:
inline
LogBufferIterator::LogBufferIterator(LogBufferHeader * header, bool in_network_order)
- : m_in_network_order(in_network_order),
- m_next(0),
- m_iter_entry_count(0),
- m_buffer_entry_count(0)
+: m_in_network_order(in_network_order),
+ m_next(0),
+ m_iter_entry_count(0),
+ m_buffer_entry_count(0)
{
ink_debug_assert(header);
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogObject.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
index 24af0af..2e21f7b 100644
--- a/proxy/logging/LogObject.cc
+++ b/proxy/logging/LogObject.cc
@@ -40,34 +40,32 @@
#include "LogObject.h"
size_t
-LogBufferManager::flush_buffers(LogBufferSink *sink)
-{
- while (!ink_atomic_cas(&_flush_array_lock, 0, 1));
-
- int ofa = _open_flush_array;
- int nfb = _num_flush_buffers[ofa];
- if (nfb) {
- _open_flush_array = !_open_flush_array; // switch to other array
- _num_flush_buffers[_open_flush_array] = 0; // clear count
- }
-
- _flush_array_lock = 0;
-
- if (nfb) {
- int i;
-
- for (i=0 ;i < nfb; ++i) {
- LogBuffer *flush_buffer = _flush_array[ofa][i];
-
- flush_buffer->update_header_data();
- sink->write(flush_buffer);
- delete flush_buffer;
+LogBufferManager::flush_buffers(LogBufferSink *sink) {
+ SList(LogBuffer, write_link) q(write_list.popall()), new_q;
+ LogBuffer *b = NULL;
+ while ((b = q.pop())) {
+ if (b->m_references) { // Still has outstanding references.
+ write_list.push(b);
+ } else if (_num_flush_buffers > FLUSH_ARRAY_SIZE) {
+ delete b;
+ ink_atomic_increment(&_num_flush_buffers, -1);
+ Warning("Dropping log buffer, can't keep up.");
+ } else {
+ new_q.push(b);
}
+ }
- Debug("log-logbuffer", "flushed %d buffers from array %d", nfb, ofa);
+ int flushed = 0;
+ while ((b = new_q.pop())) {
+ b->update_header_data();
+ sink->write(b);
+ delete b;
+ ink_atomic_increment(&_num_flush_buffers, -1);
+ flushed++;
}
- return nfb;
+ Debug("log-logbuffer", "flushed %d buffers", flushed);
+ return flushed;
}
/*-------------------------------------------------------------------------
@@ -85,8 +83,7 @@ LogObject::LogObject(LogFormat *format, const char *log_dir,
m_rolling_offset_hr (rolling_offset_hr),
m_rolling_size_mb (rolling_size_mb),
m_last_roll_time(0),
- m_ref_count (0),
- m_log_buffer (NULL)
+ m_ref_count (0)
{
ink_debug_assert (format != NULL);
m_format = new LogFormat(*format);
@@ -120,8 +117,9 @@ LogObject::LogObject(LogFormat *format, const char *log_dir,
Log::config->overspill_report_count));
#endif // TS_MICRO
- m_log_buffer = NEW (new LogBuffer (this, Log::config->log_buffer_size));
- ink_debug_assert (m_log_buffer != NULL);
+ LogBuffer *b = NEW (new LogBuffer (this, Log::config->log_buffer_size));
+ ink_debug_assert(b);
+ SET_FREELIST_POINTER_VERSION(m_log_buffer, b, 0);
_setup_rolling(rolling_enabled, rolling_interval_sec, rolling_offset_hr, rolling_size_mb);
@@ -137,8 +135,7 @@ LogObject::LogObject(LogObject& rhs)
m_signature(rhs.m_signature),
m_rolling_interval_sec(rhs.m_rolling_interval_sec),
m_last_roll_time(rhs.m_last_roll_time),
- m_ref_count(0),
- m_log_buffer(NULL)
+ m_ref_count(0)
{
m_format = new LogFormat(*(rhs.m_format));
@@ -166,8 +163,9 @@ LogObject::LogObject(LogObject& rhs)
// copy gets a fresh log buffer
//
- m_log_buffer = NEW (new LogBuffer (this, Log::config->log_buffer_size));
- ink_debug_assert (m_log_buffer != NULL);
+ LogBuffer *b = NEW (new LogBuffer (this, Log::config->log_buffer_size));
+ ink_debug_assert(b);
+ SET_FREELIST_POINTER_VERSION(m_log_buffer, b, 0);
Debug("log-config", "exiting LogObject copy constructor, "
"filename=%s this=%p", m_filename, this);
@@ -194,7 +192,7 @@ LogObject::~LogObject()
ats_free(m_filename);
ats_free(m_alt_filename);
delete m_format;
- delete m_log_buffer;
+ delete (LogBuffer*)FREELIST_POINTER(m_log_buffer);
}
//-----------------------------------------------------------------------------
@@ -405,16 +403,26 @@ LogObject::displayAsXML(FILE * fd, bool extended)
LogBuffer *
-LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
-{
+LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) {
LogBuffer::LB_ResultCode result_code;
LogBuffer *buffer;
LogBuffer *new_buffer;
bool retry = true;
do {
- buffer = m_log_buffer;
+ // To avoid a race condition, we keep a count of held references in
+ // the pointer itself and add this to m_outstanding_references.
+ head_p h;
+ int result = 0;
+ do {
+ INK_QUEUE_LD64(h, m_log_buffer);
+ head_p new_h;
+ SET_FREELIST_POINTER_VERSION(new_h, FREELIST_POINTER(h), FREELIST_VERSION(h) + 1);
+ result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, h.data, new_h.data);
+ } while (!result);
+ buffer = (LogBuffer*)FREELIST_POINTER(h);
result_code = buffer->checkout_write(write_offset, bytes_needed);
+ bool decremented = false;
switch (result_code) {
case LogBuffer::LB_OK:
@@ -428,11 +436,17 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
// no more room in current buffer, create a new one
new_buffer = NEW (new LogBuffer(this, Log::config->log_buffer_size));
- // swap the new buffer for the old one (only this thread
- // should be doing this, so there should be no problem)
- //
+ // swap the new buffer for the old one
INK_WRITE_MEMORY_BARRIER;
- ink_atomic_swap_ptr((void *)&m_log_buffer, new_buffer);
+ head_p old_h;
+ do {
+ INK_QUEUE_LD64(old_h, m_log_buffer);
+ head_p tmp_h;
+ SET_FREELIST_POINTER_VERSION(tmp_h, new_buffer, 0);
+ result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, old_h.data, tmp_h.data);
+ } while (!result);
+ if (FREELIST_POINTER(old_h) == FREELIST_POINTER(h))
+ ink_atomic_increment(&buffer->m_references, FREELIST_VERSION(old_h) - 1);
if (result_code == LogBuffer::LB_FULL_NO_WRITERS) {
// there are no writers, move the old buffer to the flush list
@@ -440,7 +454,8 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
m_buffer_manager.add_to_flush_queue(buffer);
ink_cond_signal(&Log::flush_cond);
}
- // fallover to retry
+ decremented = true;
+ break;
case LogBuffer::LB_RETRY:
// no more room, but another thread should be taking care of
@@ -460,7 +475,19 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed)
default:
ink_debug_assert(false);
}
-
+ if (!decremented) {
+ head_p old_h;
+ do {
+ INK_QUEUE_LD64(old_h, m_log_buffer);
+ if (FREELIST_POINTER(old_h) != FREELIST_POINTER(h))
+ break;
+ head_p tmp_h;
+ SET_FREELIST_POINTER_VERSION(tmp_h, FREELIST_POINTER(h), FREELIST_VERSION(old_h) - 1);
+ result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, old_h.data, tmp_h.data);
+ } while (!result);
+ if (FREELIST_POINTER(old_h) != FREELIST_POINTER(h))
+ ink_atomic_increment(&buffer->m_references, -1);
+ }
} while (retry && write_offset); // if write_offset is null, we do
// not retry because we really do
// not want to write to the buffer
@@ -750,7 +777,7 @@ LogObject::_roll_files(long last_roll_time, long time_now)
void
LogObject::check_buffer_expiration(long time_now)
{
- LogBuffer *b = m_log_buffer;
+ LogBuffer *b = (LogBuffer*)FREELIST_POINTER(m_log_buffer);
if (b && time_now > b->expiration_time()) {
force_new_buffer();
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogObject.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogObject.h b/proxy/logging/LogObject.h
index 1b3232a..b163696 100644
--- a/proxy/logging/LogObject.h
+++ b/proxy/logging/LogObject.h
@@ -67,40 +67,21 @@ Debug("log-api-mutex", _f)
class LogBufferManager
{
-private:
- int _flush_array_lock;
-
- LogBuffer *_flush_array[2][FLUSH_ARRAY_SIZE];
-
- int _num_flush_buffers[2]; // number of buffers in queue
- int _open_flush_array; // index of queue accepting buffers
-
-public:
- LogBufferManager()
- : _flush_array_lock(0), _open_flush_array(0)
- {
- ink_zero(_num_flush_buffers);
- }
-
- void add_to_flush_queue(LogBuffer * buffer)
- {
- while (!ink_atomic_cas(&_flush_array_lock, 0, 1));
+ private:
+ ASLL(LogBuffer, write_link) write_list;
+ int _num_flush_buffers;
- if (_num_flush_buffers[_open_flush_array] < FLUSH_ARRAY_SIZE) {
- int idx = _num_flush_buffers[_open_flush_array]++;
+ public:
+ LogBufferManager() : _num_flush_buffers(0) { }
- _flush_array[_open_flush_array][idx] = buffer;
- } else {
- Warning("Dropping log buffer, can't keep up");
- delete buffer;
+ void add_to_flush_queue(LogBuffer *buffer) {
+ write_list.push(buffer);
+ ink_atomic_increment(&_num_flush_buffers, 1);
}
- _flush_array_lock = 0;
- }
- size_t flush_buffers(LogBufferSink *sink);
+ size_t flush_buffers(LogBufferSink *sink);
};
-
class LogObject
{
public:
@@ -235,7 +216,7 @@ private:
int m_ref_count;
- LogBuffer *volatile m_log_buffer; // current work buffer
+ volatile head_p m_log_buffer; // current work buffer
LogBufferManager m_buffer_manager;
void generate_filenames(const char *log_dir, const char *basename, LogFileFormat file_format);