You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by yu...@apache.org on 2013/08/16 04:44:37 UTC

git commit: TS-2089: introduce configurable collation preproc threads

Updated Branches:
  refs/heads/master e0ec5304d -> 8a1112881


TS-2089: introduce configurable collation preproc threads

We found that CPU of logging thread could be easy to reach up 100% in
collation host, but disk IO was low at the same time.

The bottleneck of logging thread is that some preprocessing job, such as
convert LogBuffer to ascii text, consume so much CPU time. And more
worse, the write() operation will block logging thread.

So this patch try to split logging thread into two parts:
1) Configurable preproc threads, which are responsiable for processing all
   of prepare work, and then forward the preprocessed buffer to flush thread,
   or send them to CollationClient/HostSM.

2) One Flush thread, it will consume preprocessed buffers and write them to
   disk. In our testing, one flush thread is enough for us.

TODO: This patch supports only one flush thread, we can improve it to
      "one flush thread per file/disk" in the future.

== How to configure ==
The number of preproc threads is 1 by default.

Please modify "proxy.config.log.collation_preproc_threads" option to
change it.

Signed-off-by: Yunkai Zhang <qi...@taobao.com>


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288

Branch: refs/heads/master
Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
Parents: e0ec530
Author: Yunkai Zhang <qi...@taobao.com>
Authored: Sun Aug 11 16:42:32 2013 +0800
Committer: Yunkai Zhang <qi...@taobao.com>
Committed: Fri Aug 16 10:41:16 2013 +0800

----------------------------------------------------------------------
 CHANGES                               |   3 +
 mgmt/RecordsConfig.cc                 |   2 +
 mgmt/cli/ShowCmd.cc                   |   3 +
 proxy/logging/Log.cc                  | 257 ++++++++++++++++++++++++-----
 proxy/logging/Log.h                   |  46 +++++-
 proxy/logging/LogBufferSink.h         |   9 +-
 proxy/logging/LogCollationClientSM.cc |   6 +-
 proxy/logging/LogCollationHostSM.cc   |   4 +-
 proxy/logging/LogConfig.cc            |  19 ++-
 proxy/logging/LogConfig.h             |   1 +
 proxy/logging/LogFile.cc              | 121 ++++++--------
 proxy/logging/LogFile.h               |  20 +--
 proxy/logging/LogHost.cc              |  66 ++++----
 proxy/logging/LogHost.h               |  14 +-
 proxy/logging/LogObject.cc            |  68 ++++----
 proxy/logging/LogObject.h             |  43 +++--
 16 files changed, 450 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6b9a3bb..c613f00 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache Traffic Server 3.5.0
 
+
+  *) TS-2089: introduce configurable collation preproc threads
+
   *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
    needlessly chowned to to ATS' user.
    Author: Tomasz Kuzemko <to...@kuzemko.net>

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
----------------------------------------------------------------------
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index cfc2267..9402581 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
   ,
   {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT, "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
+  {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT, "1", RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
+  ,
   {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
   ,
   {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT, "86400", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
----------------------------------------------------------------------
diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
index 0798c43..ed71560 100644
--- a/mgmt/cli/ShowCmd.cc
+++ b/mgmt/cli/ShowCmd.cc
@@ -1555,6 +1555,7 @@ ShowLogging()
   TSInt collation_port = -1;
   TSString collation_secret = NULL;
   TSInt host_tag = 0;
+  TSInt preproc_threads = 0;
   TSInt orphan_space = -1;
 
   TSInt squid_log = 0;
@@ -1596,6 +1597,7 @@ ShowLogging()
   Cli_RecordGetString("proxy.config.log.collation_secret", &collation_secret);
   Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
   Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs", &orphan_space);
+  Cli_RecordGetInt("proxy.config.log.collation_preproc_threads", &preproc_threads);
 
   Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
   Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
@@ -1657,6 +1659,7 @@ ShowLogging()
   Cli_Printf("  Port ----------------------------------- %d\n", collation_port);
   Cli_Printf("  Secret --------------------------------- %s\n", collation_secret);
   Cli_PrintEnable("  Host Tagged ---------------------------- ", host_tag);
+  Cli_PrintEnable("  Preproc Threads ------------------------ ", preproc_threads);
   Cli_Printf("  Space Limit for Orphan Files ----------- %d MB\n", orphan_space);
 
   Cli_PrintEnable("\nSquid Format ----------------------------- ", squid_log);

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
index 94d5625..1361978 100644
--- a/proxy/logging/Log.cc
+++ b/proxy/logging/Log.cc
@@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
 size_t Log::maxInactiveObjects;
 
 // Flush thread stuff
-volatile unsigned long Log::flush_counter = 0;
-ink_mutex Log::flush_mutex;
-ink_cond Log::flush_cond;
-ink_thread Log::flush_thread;
+ink_mutex *Log::preproc_mutex;
+ink_cond *Log::preproc_cond;
+ink_mutex *Log::flush_mutex;
+ink_cond *Log::flush_cond;
+InkAtomicList *Log::flush_data_list;
 
 // Collate thread stuff
 ink_mutex Log::collate_mutex;
 ink_cond Log::collate_cond;
 ink_thread Log::collate_thread;
 int Log::collation_accept_file_descriptor;
+int Log::collation_preproc_threads;
 int Log::collation_port;
 
 // Log private objects
@@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
 
 struct PeriodicWakeup;
 typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
-struct PeriodicWakeup : Continuation {
+struct PeriodicWakeup : Continuation
+{
+  int m_preproc_threads;
+  int m_flush_threads;
+
   int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
   {
-    ink_cond_signal (&Log::flush_cond);
-    return EVENT_CONT;
+      for (int i = 0; i < m_preproc_threads; i++) {
+        ink_cond_signal (&Log::preproc_cond[i]);
+      }
+      for (int i = 0; i < m_flush_threads; i++) {
+        ink_cond_signal (&Log::flush_cond[i]);
+      }
+      return EVENT_CONT;
   }
 
-  PeriodicWakeup () : Continuation (new_ProxyMutex())
+  PeriodicWakeup (int preproc_threads, int flush_threads) :
+    Continuation (new_ProxyMutex()),
+    m_preproc_threads(preproc_threads),
+    m_flush_threads(flush_threads)
   {
-    SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
+      SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
   }
 };
 
@@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
 /*-------------------------------------------------------------------------
   MAIN INTERFACE
   -------------------------------------------------------------------------*/
+struct LoggingPreprocContinuation: public Continuation
+{
+  int m_idx;
+
+  int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
+  {
+    Log::preproc_thread_main((void *)&m_idx);
+    return 0;
+  }
+
+  LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
+  {
+    SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
+  }
+};
+
 struct LoggingFlushContinuation: public Continuation
 {
+  int m_idx;
+
   int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
   {
-    Log::flush_thread_main(NULL);
+    Log::flush_thread_main((void *)&m_idx);
     return 0;
   }
 
-  LoggingFlushContinuation():Continuation(NULL)
+  LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
   {
     SET_HANDLER(&LoggingFlushContinuation::mainEvent);
   }
@@ -910,6 +942,7 @@ Log::init(int flags)
   numInactiveObjects = 0;
   inactive_objects = new LogObject*[maxInactiveObjects];
 
+  collation_preproc_threads = 1;
   collation_accept_file_descriptor = NO_FD;
 
   // store the configuration flags
@@ -931,6 +964,7 @@ Log::init(int flags)
 
     config->read_configuration_variables();
     collation_port = config->collation_port;
+    collation_preproc_threads = config->collation_preproc_threads;
 
     if (config_flags & STANDALONE_COLLATOR) {
       logging_mode = LOG_TRANSACTIONS_ONLY;
@@ -959,8 +993,8 @@ Log::init(int flags)
     create_threads();
 
 #ifndef INK_SINGLE_THREADED
-    eventProcessor.schedule_every(NEW (new PeriodicWakeup()), HRTIME_SECOND,
-        ET_CALL);
+    eventProcessor.schedule_every(NEW (new PeriodicWakeup(collation_preproc_threads, 1)),
+                                  HRTIME_SECOND, ET_CALL);
 #endif
     init_status |= PERIODIC_WAKEUP_SCHEDULED;
 
@@ -1001,9 +1035,16 @@ Log::init_when_enabled()
     // setup global scrap object
     //
     global_scrap_format = NEW(new LogFormat(TEXT_LOG));
-    global_scrap_object = NEW(new LogObject(global_scrap_format, Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
-                                            NULL, Log::config->rolling_enabled, Log::config->rolling_interval_sec,
-                                            Log::config->rolling_offset_hr, Log::config->rolling_size_mb));
+    global_scrap_object =
+      NEW(new LogObject(global_scrap_format,
+                        Log::config->logfile_dir,
+                        "scrapfile.log",
+                        BINARY_LOG, NULL,
+                        Log::config->rolling_enabled,
+                        Log::config->collation_preproc_threads,
+                        Log::config->rolling_interval_sec,
+                        Log::config->rolling_offset_hr,
+                        Log::config->rolling_size_mb));
 
     // create the flush thread and the collation thread
     //
@@ -1030,15 +1071,43 @@ Log::create_threads()
 
   REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
   if (!(init_status & THREADS_CREATED)) {
-    // start the flush thread
+
+    char desc[64];
+    preproc_mutex = new ink_mutex[collation_preproc_threads];
+    preproc_cond = new ink_cond[collation_preproc_threads];
+
+    size_t stacksize;
+    REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
+
+    // start the preproc threads
     //
     // no need for the conditional var since it will be relying on
     // on the event system.
-    ink_mutex_init(&flush_mutex, "Flush thread mutex");
-    ink_cond_init(&flush_cond);
-    Continuation *flush_continuation = NEW(new LoggingFlushContinuation);
-    Event *flush_event = eventProcessor.spawn_thread(flush_continuation, "[LOGGING]", stacksize);
-    flush_thread = flush_event->ethread->tid;
+    for (int i = 0; i < collation_preproc_threads; i++) {
+      sprintf(desc, "Logging preproc thread mutex[%d]", i);
+      ink_mutex_init(&preproc_mutex[i], desc);
+      ink_cond_init(&preproc_cond[i]);
+      Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
+      sprintf(desc, "[LOG_PREPROC %d]", i);
+      eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
+    }
+
+    // Now, only one flush thread is supported.
+    // TODO: Enable multiple flush threads, such as
+    //       one flush thread per file.
+    //
+    flush_mutex = new ink_mutex;
+    flush_cond = new ink_cond;
+    flush_data_list = new InkAtomicList;
+
+    sprintf(desc, "Logging flush thread mutex");
+    ink_mutex_init(flush_mutex, desc);
+    ink_cond_init(flush_cond);
+    sprintf(desc, "Logging flush buffer list");
+    ink_atomiclist_init(flush_data_list, desc, 0);
+    Continuation *flush_cont = NEW(new LoggingFlushContinuation(0));
+    sprintf(desc, "[LOG_FLUSH]");
+    eventProcessor.spawn_thread(flush_cont, desc, stacksize);
 
 #if !defined(IOCORE_LOG_COLLATION)
     // start the collation thread if we are not using iocore log collation
@@ -1164,52 +1233,150 @@ Log::va_error(char *format, va_list ap)
 }
 
 /*-------------------------------------------------------------------------
-  Log::flush_thread_main
+  Log::preproc_thread_main
 
-  This function defines the functionality of the logging flush thread,
-  whose purpose is to consume LogBuffer objects from the
-  global_buffer_full_list, process them, and destroy them.
+  This function defines the functionality of the logging flush prepare
+  thread, whose purpose is to consume LogBuffer objects from the
+  global_buffer_full_list, do some prepare work(such as convert to ascii),
+  and then forward to flush thread.
   -------------------------------------------------------------------------*/
 
 void *
-Log::flush_thread_main(void * /* args ATS_UNUSED */)
+Log::preproc_thread_main(void *args)
 {
-  time_t now, last_time = 0;
-  size_t buffers_flushed;
+  size_t buffers_preproced;
+  int idx = *(int *)args;
 
-  Debug("log-flush", "Log flush thread is alive ...");
+  Debug("log-preproc", "log preproc thread is alive ...");
 
-  while (true) {
-    buffers_flushed = 0;
+  ink_mutex_acquire(&preproc_mutex[idx]);
 
-    buffers_flushed = config->log_object_manager.flush_buffers();
+  while (true) {
+    buffers_preproced = config->log_object_manager.preproc_buffers(idx);
 
     if (error_log)
-      buffers_flushed += error_log->flush_buffers();
+      buffers_preproced += error_log->preproc_buffers(idx);
 
     // config->increment_space_used(bytes_to_disk);
     // TODO: the bytes_to_disk should be set to Log
 
-    Debug("log-flush","%zu buffers flushed this round", buffers_flushed);
+    Debug("log-preproc","%zu buffers preprocessed this round",
+          buffers_preproced);
+
+    // wait for more work; a spurious wake-up is ok since we'll just
+    // check the queue and find there is nothing to do, then wait
+    // again.
+    //
+    ink_cond_wait (&preproc_cond[idx], &preproc_mutex[idx]);
+  }
+
+  /* NOTREACHED */
+  ink_mutex_release(&preproc_mutex[idx]);
+  return NULL;
+}
+
+void *
+Log::flush_thread_main(void * /* args ATS_UNUSED */)
+{
+  char *buf;
+  LogFile *logfile;
+  LogBuffer *logbuffer;
+  LogFlushData *fdata;
+  ink_hrtime now, last_time = 0;
+  int len, bytes_written, total_bytes;
+  SLL<LogFlushData, LogFlushData::Link_link> link, invert_link;
+
+  ink_mutex_acquire(flush_mutex);
+
+  while (true) {
+    fdata = (LogFlushData *) ink_atomiclist_popall(flush_data_list);
+
+    // invert the list
+    //
+    link.head = fdata;
+    while ((fdata = link.pop()))
+      invert_link.push(fdata);
+
+    // process each flush data
+    //
+    while ((fdata = invert_link.pop())) {
+      buf = NULL;
+      total_bytes = 0;
+      bytes_written = 0;
+      logfile = fdata->m_logfile;
+
+      if (logfile->m_file_format == BINARY_LOG) {
+
+        logbuffer = (LogBuffer *)fdata->m_data;
+        LogBufferHeader *buffer_header = logbuffer->header();
+
+        buf = (char *)buffer_header;
+        total_bytes = buffer_header->byte_count;
+
+      } else if (logfile->m_file_format == ASCII_LOG
+                 || logfile->m_file_format == ASCII_PIPE){
+
+        buf = (char *)fdata->m_data;
+        total_bytes = fdata->m_len;
+
+      } else {
+        ink_release_assert(!"Unknown file format type!");
+      }
+
+      // make sure we're open & ready to write
+      logfile->check_fd();
+      if (!logfile->is_open()) {
+        Warning("File:%s was closed, have dropped (%d) bytes.",
+                logfile->m_name, total_bytes);
+        delete fdata;
+        continue;
+      }
+
+      // write *all* data to target file as much as possible
+      //
+      while (total_bytes - bytes_written) {
+        if (Log::config->logging_space_exhausted) {
+          Warning("logging space exhausted, failed to write file:%s, have dropped (%d) bytes.",
+                  logfile->m_name, (total_bytes - bytes_written));
+          break;
+        }
+
+        len = ::write(logfile->m_fd, &buf[bytes_written],
+                      total_bytes - bytes_written);
+        if (len < 0) {
+          Error("Failed to write log to %s: [tried %d, wrote %d, %s]",
+                logfile->m_name, total_bytes, bytes_written, strerror(errno));
+          ink_release_assert(!"test");
+          break;
+        }
+        bytes_written += len;
+      }
+
+      ink_atomic_increment(&logfile->m_bytes_written, bytes_written);
+
+      delete fdata;
+    }
 
     // Time to work on periodic events??
     //
-    now = time(NULL);
+    now = ink_get_hrtime() / HRTIME_SECOND;
     if (now > last_time) {
-      if ((now % PERIODIC_TASKS_INTERVAL) == 0) {
-        Debug("log-flush", "periodic tasks for %" PRId64, (int64_t)now);
+      if ((now % (PERIODIC_TASKS_INTERVAL)) == 0) {
+        Debug("log-preproc", "periodic tasks for %" PRId64, (int64_t)now);
         periodic_tasks(now);
       }
-      last_time = (now = time(NULL));
+      last_time = (now = ink_get_hrtime() / HRTIME_SECOND);
     }
+
     // wait for more work; a spurious wake-up is ok since we'll just
     // check the queue and find there is nothing to do, then wait
     // again.
     //
-    ink_mutex_try_acquire(&flush_mutex); // acquire if not already acquired, so ink_cond_wait doesn't fail us
-    ink_cond_wait (&flush_cond, &flush_mutex);
+    ink_cond_wait(flush_cond, flush_mutex);
   }
+
   /* NOTREACHED */
+  ink_mutex_release(flush_mutex);
   return NULL;
 }
 
@@ -1234,6 +1401,8 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
 
   Debug("log-thread", "Log collation thread is alive ...");
 
+  ink_mutex_acquire(&collate_mutex);
+
   while (true) {
     ink_assert(Log::config != NULL);
 
@@ -1321,7 +1490,9 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
     Debug("log", "no longer collation host, deleting LogSock");
     delete sock;
   }
+
   /* NOTREACHED */
+  ink_mutex_release(&collate_mutex);
   return NULL;
 }
 
@@ -1355,8 +1526,10 @@ Log::match_logobject(LogBufferHeader * header)
       obj = NEW(new LogObject(fmt, Log::config->logfile_dir,
                               header->log_filename(), file_format, NULL,
                               Log::config->rolling_enabled,
+                              Log::config->collation_preproc_threads,
                               Log::config->rolling_interval_sec,
-                              Log::config->rolling_offset_hr, Log::config->rolling_size_mb));
+                              Log::config->rolling_offset_hr,
+                              Log::config->rolling_size_mb));
 
       obj->set_remote_flag();
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.h
----------------------------------------------------------------------
diff --git a/proxy/logging/Log.h b/proxy/logging/Log.h
index c6b4af7..2f395ca 100644
--- a/proxy/logging/Log.h
+++ b/proxy/logging/Log.h
@@ -296,6 +296,8 @@
 #include <stdarg.h>
 #include "libts.h"
 #include "P_RecProcess.h"
+#include "LogFile.h"
+#include "LogBuffer.h"
 
 class LogAccess;
 class LogFieldList;
@@ -303,12 +305,45 @@ class LogFilterList;
 class LogFormatList;
 //class LogBufferList; vl: we don't need it here
 struct LogBufferHeader;
+class LogFile;
 class LogBuffer;
 class LogFormat;
 class LogObject;
 class LogConfig;
 class TextLogObject;
 
+class LogFlushData
+{
+public:
+  LINK(LogFlushData, link);
+  LogFile *m_logfile;
+  LogBuffer *logbuffer;
+  void *m_data;
+  int m_len;
+
+  LogFlushData(LogFile *logfile, void *data, int len = -1):
+    m_logfile(logfile), m_data(data), m_len(len)
+  {
+  }
+
+  ~LogFlushData()
+  {
+    switch (m_logfile->m_file_format) {
+    case BINARY_LOG:
+      logbuffer = (LogBuffer *)m_data;
+      LogBuffer::destroy(logbuffer);
+      break;
+    case ASCII_LOG:
+    case ASCII_PIPE:
+      free(m_data);
+      break;
+    case N_LOGFILE_TYPES:
+    default:
+      ink_release_assert(!"Unknown file format type!");
+    }
+  }
+};
+
 /**
    This object exists to provide a namespace for the logging system.
    It contains all data types and global variables relevant to the
@@ -385,16 +420,19 @@ public:
   static void add_to_inactive(LogObject * obj);
 
   // logging thread stuff
-  static volatile unsigned long flush_counter;
-  static ink_mutex flush_mutex;
-  static ink_cond flush_cond;
-  static ink_thread flush_thread;
+  static ink_mutex *preproc_mutex;
+  static ink_cond *preproc_cond;
+  static void *preproc_thread_main(void *args);
+  static ink_mutex *flush_mutex;
+  static ink_cond *flush_cond;
+  static InkAtomicList *flush_data_list;
   static void *flush_thread_main(void *args);
 
   // collation thread stuff
   static ink_mutex collate_mutex;
   static ink_cond collate_cond;
   static ink_thread collate_thread;
+  static int collation_preproc_threads;
   static int collation_accept_file_descriptor;
   static int collation_port;
   static void *collate_thread_main(void *args);

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogBufferSink.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogBufferSink.h b/proxy/logging/LogBufferSink.h
index 6b3972c..b7d4ca6 100644
--- a/proxy/logging/LogBufferSink.h
+++ b/proxy/logging/LogBufferSink.h
@@ -36,11 +36,14 @@ class LogBufferSink
 {
 public:
   //
-  // The write_and_delete() function should be responsible for
+  // The preproc_and_try_delete() function should be responsible for
   // freeing memory pointed to by _buffer_ parameter.
   //
-  virtual int write_and_delete(LogBuffer * buffer) = 0;
-    virtual ~ LogBufferSink()
+  // Of course, this function may not free memory directly, it
+  // can delegate another function to do it.
+  //
+  virtual void preproc_and_try_delete(LogBuffer * buffer) = 0;
+  virtual ~ LogBufferSink()
   {
   };
 };

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogCollationClientSM.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogCollationClientSM.cc b/proxy/logging/LogCollationClientSM.cc
index 933bc38..f917742 100644
--- a/proxy/logging/LogCollationClientSM.cc
+++ b/proxy/logging/LogCollationClientSM.cc
@@ -711,8 +711,7 @@ LogCollationClientSM::flush_to_orphan()
     Debug("log-coll", "[%d]client::flush_to_orphan - m_buffer_in_iocore to oprhan", m_id);
     // TODO: We currently don't try to make the log buffers handle little vs big endian. TS-1156.
     // m_buffer_in_iocore->convert_to_host_order();
-    m_log_host->orphan_write(m_buffer_in_iocore);
-    LogBuffer::destroy(m_buffer_in_iocore);
+    m_log_host->orphan_write_and_try_delete(m_buffer_in_iocore);
     m_buffer_in_iocore = NULL;
   }
   // flush buffers in send_list to orphan
@@ -720,8 +719,7 @@ LogCollationClientSM::flush_to_orphan()
   ink_assert(m_buffer_send_list != NULL);
   while ((log_buffer = m_buffer_send_list->get()) != NULL) {
     Debug("log-coll", "[%d]client::flush_to_orphan - send_list to orphan", m_id);
-    m_log_host->orphan_write(log_buffer);
-    LogBuffer::destroy(log_buffer);
+    m_log_host->orphan_write_and_try_delete(log_buffer);
   }
 
   // Now send_list is empty, let's update m_flow to ALLOW status

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogCollationHostSM.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogCollationHostSM.cc b/proxy/logging/LogCollationHostSM.cc
index b6d3574..9add290 100644
--- a/proxy/logging/LogCollationHostSM.cc
+++ b/proxy/logging/LogCollationHostSM.cc
@@ -320,8 +320,8 @@ LogCollationHostSM::host_recv(int event, void * /* data ATS_UNUSED */)
         // object's flush queue
         //
         log_buffer = NEW(new LogBuffer(log_object, log_buffer_header));
-        log_object->add_to_flush_queue(log_buffer);
-        ink_cond_signal(&Log::flush_cond);
+        int idx = log_object->add_to_flush_queue(log_buffer);
+        ink_cond_signal(&Log::preproc_cond[idx]);
       }
 
 #if defined(LOG_BUFFER_TRACKING)

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogConfig.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
index 62875a8..6cf55ba 100644
--- a/proxy/logging/LogConfig.cc
+++ b/proxy/logging/LogConfig.cc
@@ -138,6 +138,7 @@ LogConfig::setup_default_values()
   collation_host = ats_strdup("none");
   collation_port = 0;
   collation_host_tagged = false;
+  collation_preproc_threads = 1;
   collation_secret = ats_strdup("foobar");
   collation_retry_sec = 0;
   collation_max_send_buffers = 0;
@@ -414,6 +415,11 @@ LogConfig::read_configuration_variables()
   val = (int) LOG_ConfigReadInteger("proxy.config.log.collation_host_tagged");
   collation_host_tagged = (val > 0);
 
+  val = (int) LOG_ConfigReadInteger("proxy.config.log.collation_preproc_threads");
+  if (val > 0 && val <= 128) {
+    collation_preproc_threads = val;
+  }
+
   ptr = LOG_ConfigReadString("proxy.config.log.collation_secret");
   if (ptr != NULL) {
     ats_free(collation_secret);
@@ -705,9 +711,10 @@ LogConfig::init(LogConfig * prev_config)
         (old_elog && Log::error_logging_enabled() &&
          (prev_config ? !strcmp(prev_config->logfile_dir, logfile_dir) : 0)))) {
     if (Log::error_logging_enabled()) {
-      new_elog =
-        NEW(new TextLogObject("error.log", logfile_dir, true, NULL,
-                              rolling_enabled, rolling_interval_sec, rolling_offset_hr, rolling_size_mb));
+      new_elog = NEW(new TextLogObject("error.log", logfile_dir, true, NULL,
+                                       rolling_enabled, collation_preproc_threads,
+                                       rolling_interval_sec, rolling_offset_hr,
+                                       rolling_size_mb));
       if (new_elog->do_filesystem_checks() < 0) {
         const char *msg = "The log file %s did not pass filesystem checks. " "No output will be produced for this log";
         Error(msg, new_elog->get_full_filename());
@@ -792,6 +799,7 @@ LogConfig::display(FILE * fd)
   fprintf(fd, "   collation_host = %s\n", collation_host);
   fprintf(fd, "   collation_port = %d\n", collation_port);
   fprintf(fd, "   collation_host_tagged = %d\n", collation_host_tagged);
+  fprintf(fd, "   collation_preproc_threads = %d\n", collation_preproc_threads);
   fprintf(fd, "   collation_secret = %s\n", collation_secret);
   fprintf(fd, "   rolling_enabled = %d\n", rolling_enabled);
   fprintf(fd, "   rolling_interval_sec = %d\n", rolling_interval_sec);
@@ -935,7 +943,9 @@ LogConfig::create_pre_defined_objects_with_filter(const PreDefinedFormatInfoList
     LogObject *obj;
     obj = NEW(new LogObject(pdi->format, logfile_dir, obj_fname,
                             pdi->is_ascii ? ASCII_LOG : BINARY_LOG,
-                            pdi->header, rolling_enabled, rolling_interval_sec, rolling_offset_hr, rolling_size_mb));
+                            pdi->header, rolling_enabled,
+                            collation_preproc_threads, rolling_interval_sec,
+                            rolling_offset_hr, rolling_size_mb));
 
     if (collation_mode == SEND_STD_FMTS || collation_mode == SEND_STD_AND_NON_XML_CUSTOM_FMTS) {
 
@@ -2173,6 +2183,7 @@ LogConfig::read_xml_log_config(int from_memory)
                                          file_type,
                                          header.dequeue(),
                                          obj_rolling_enabled,
+                                         collation_preproc_threads,
                                          obj_rolling_interval_sec,
                                          obj_rolling_offset_hr,
                                          obj_rolling_size_mb));

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogConfig.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogConfig.h b/proxy/logging/LogConfig.h
index 654b276..927e2bb 100644
--- a/proxy/logging/LogConfig.h
+++ b/proxy/logging/LogConfig.h
@@ -216,6 +216,7 @@ public:
   int collation_mode;
   int collation_port;
   bool collation_host_tagged;
+  int collation_preproc_threads;
   int collation_retry_sec;
   int collation_max_send_buffers;
   int rolling_enabled;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogFile.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogFile.cc b/proxy/logging/LogFile.cc
index 0a2e53f..df605df 100644
--- a/proxy/logging/LogFile.cc
+++ b/proxy/logging/LogFile.cc
@@ -102,7 +102,6 @@ LogFile::LogFile(const char *name, const char *header, LogFileFormat format,
   m_bytes_written = 0;
   m_size_bytes = 0;
   m_ascii_buffer_size = (ascii_buffer_size < max_line_size ? max_line_size : ascii_buffer_size);
-  m_ascii_buffer = NEW(new char[m_ascii_buffer_size]);
 
   Debug("log-file", "exiting LogFile constructor, m_name=%s, this=%p", m_name, this);
 }
@@ -126,8 +125,7 @@ LogFile::LogFile (const LogFile& copy)
     m_end_time (0L),
     m_bytes_written (0)
 {
-    ink_assert(m_ascii_buffer_size >= m_max_line_size);
-    m_ascii_buffer = NEW (new char[m_ascii_buffer_size]);
+    ink_release_assert(m_ascii_buffer_size >= m_max_line_size);
 
     Debug("log-file", "exiting LogFile copy constructor, m_name=%s, this=%p",
           m_name, this);
@@ -144,8 +142,6 @@ LogFile::~LogFile()
   ats_free(m_name);
   ats_free(m_header);
   delete m_meta_info;
-  delete[]m_ascii_buffer;
-  m_ascii_buffer = 0;
   Debug("log-file", "exiting LogFile destructor, this=%p", this);
 }
 
@@ -472,22 +468,28 @@ LogFile::roll(long interval_start, long interval_end)
 }
 
 /*-------------------------------------------------------------------------
+<<<<<<< HEAD
   LogFile::write_and_try_delete
+=======
+  LogFile::preproc_and_try_delete
+
+  preprocess the given buffer data before write to target file
+  and try to delete it when its reference become zero.
+>>>>>>> TS-2089: introduce configurable collation preproc threads
 
-  Write the given LogBuffer data onto this file and
-  free the buffer memory according _need_delete_ parameter.
   -------------------------------------------------------------------------*/
-int
-LogFile::write_and_try_delete(LogBuffer * lb, bool need_delete)
+void
+LogFile::preproc_and_try_delete(LogBuffer * lb)
 {
-  int bytes = 0;
-  int result = -1;
   LogBufferHeader *buffer_header;
 
   if (lb == NULL) {
     Note("Cannot write LogBuffer to LogFile %s; LogBuffer is NULL", m_name);
-    return -1;
+    return;
   }
+
+  ink_atomic_increment(&lb->m_references, 1);
+
   if ((buffer_header = lb->header()) == NULL) {
     Note("Cannot write LogBuffer to LogFile %s; LogBufferHeader is NULL",
         m_name);
@@ -496,16 +498,17 @@ LogFile::write_and_try_delete(LogBuffer * lb, bool need_delete)
   if (buffer_header->entry_count == 0) {
     // no bytes to write
     Note("LogBuffer with 0 entries for LogFile %s, nothing to write", m_name);
-    result = 0;
     goto done;
   }
 
-  // make sure we're open & ready to write
-
-  check_fd();
-  if (!is_open()) {
-    goto done;
-  }
+  //
+  // If the start time for this file has yet to be established, then grab
+  // the low_timestamp from the given LogBuffer.  Then, we always set the
+  // end time to the high_timestamp, so it's always up to date.
+  //
+  if (!m_start_time)
+    m_start_time = buffer_header->low_timestamp;
+  m_end_time = buffer_header->high_timestamp;
 
   if (m_file_format == BINARY_LOG) {
     //
@@ -516,45 +519,28 @@ LogFile::write_and_try_delete(LogBuffer * lb, bool need_delete)
     // don't change between buffers), it's not worth trying to separate
     // out the buffer-dependent data from the buffer-independent data.
     //
-    while (static_cast<uint32_t>(bytes) < buffer_header->byte_count) {
-      int cnt = ::write(m_fd, buffer_header, buffer_header->byte_count);
-      if (cnt < 0) {
-        Error("An error was encountered writing to %s: [tried %d, wrote %d, '%s']",
-              m_name, buffer_header->byte_count, bytes, strerror(errno));
-        break;
-      }
-      bytes += cnt;
-    }
+    LogFlushData *flush_data = new LogFlushData(this, lb);
+
+    ink_atomiclist_push(Log::flush_data_list, flush_data);
+
+    ink_cond_signal(Log::flush_cond);
+
+    //
+    // LogBuffer will be deleted in flush thread
+    //
+    return;
   }
   else if (m_file_format == ASCII_LOG || m_file_format == ASCII_PIPE) {
-    bytes = write_ascii_logbuffer3(buffer_header);
-#if defined(LOG_BUFFER_TRACKING)
-    Debug("log-buftrak", "[%d]LogFile::write - ascii write complete",
-        buffer_header->id);
-#endif // defined(LOG_BUFFER_TRACKING)
+    write_ascii_logbuffer3(buffer_header);
   }
   else {
     Note("Cannot write LogBuffer to LogFile %s; invalid file format: %d",
-        m_name, m_file_format);
-    goto done;
+         m_name, m_file_format);
   }
 
-  //
-  // If the start time for this file has yet to be established, then grab
-  // the low_timestamp from the given LogBuffer.  Then, we always set the
-  // end time to the high_timestamp, so it's always up to date.
-  //
-
-  if (!m_start_time)
-    m_start_time = buffer_header->low_timestamp;
-  m_end_time = buffer_header->high_timestamp;
-  m_bytes_written += bytes;
-  result = bytes;
-
 done:
-  if (need_delete)
-    delete lb;
-  return result;
+  LogBuffer::destroy(lb);
+  return;
 }
 
 /*-------------------------------------------------------------------------
@@ -646,6 +632,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
   LogFormatType format_type;
   char *fieldlist_str;
   char *printf_str;
+  char *ascii_buffer;
 
   switch (buffer_header->version) {
   case LOG_SEGMENT_VERSION:
@@ -663,7 +650,10 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
   while ((entry_header = iter.next())) {
     fmt_buf_bytes = 0;
 
-    ink_release_assert(m_ascii_buffer_size >= m_max_line_size);
+    if (m_file_format == ASCII_PIPE)
+      ascii_buffer = (char *)malloc(m_max_line_size);
+    else
+      ascii_buffer = (char *)malloc(m_ascii_buffer_size);
 
     // fill the buffer with as many records as possible
     //
@@ -674,7 +664,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
       }
 
       int bytes = LogBuffer::to_ascii(entry_header, format_type,
-                                      &m_ascii_buffer[fmt_buf_bytes],
+                                      &ascii_buffer[fmt_buf_bytes],
                                       m_max_line_size - 1,
                                       fieldlist_str, printf_str,
                                       buffer_header->version,
@@ -682,7 +672,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
 
       if (bytes > 0) {
         fmt_buf_bytes += bytes;
-        m_ascii_buffer[fmt_buf_bytes] = '\n';
+        ascii_buffer[fmt_buf_bytes] = '\n';
         ++fmt_buf_bytes;
       } else {
         Error("Failed to convert LogBuffer to ascii, have dropped (%" PRIu32 ") bytes.",
@@ -699,29 +689,14 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
         break;
     } while ((entry_header = iter.next()));
 
-    int bytes_written = 0;
-
-    // try to write *all* the buffer out to the file or pipe
+    // send the buffer to flush thread
     //
-    while (bytes_written < fmt_buf_bytes) {
-      if (Log::config->logging_space_exhausted) {
-        Warning("logging space exhausted, have dropped (%d) bytes.",
-                (fmt_buf_bytes - bytes_written));
-        break;
-      }
+    LogFlushData *flush_data = new LogFlushData(this, ascii_buffer, fmt_buf_bytes);
+    ink_atomiclist_push(Log::flush_data_list, flush_data);
 
-      int cnt = ::write(m_fd, &m_ascii_buffer[bytes_written],
-                        (fmt_buf_bytes - bytes_written));
-      if (cnt < 0) {
-        Error("An error was encountered in writing to %s: %s.",
-              ((m_name) ? m_name : "logfile"), strerror(errno));
-        break;
-      }
-
-      bytes_written += cnt;
-    }
+    ink_cond_signal(Log::flush_cond);
 
-    total_bytes += bytes_written;
+    total_bytes += fmt_buf_bytes;
   }
 
   return total_bytes;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogFile.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogFile.h b/proxy/logging/LogFile.h
index ddba682..3a3dc45 100644
--- a/proxy/logging/LogFile.h
+++ b/proxy/logging/LogFile.h
@@ -141,13 +141,8 @@ public:
     LOG_FILE_FILESYSTEM_CHECKS_FAILED
   };
 
-  inline int write(LogBuffer * lb) {
-    return write_and_try_delete(lb, false);
-  }
+  void preproc_and_try_delete(LogBuffer * lb);
 
-  inline int write_and_delete(LogBuffer * lb) {
-    return write_and_try_delete(lb, true);
-  }
   int roll(long interval_start, long interval_end);
 
   char *get_name() const { return m_name; }
@@ -171,7 +166,7 @@ public:
   off_t get_size_bytes() const { return m_file_format != ASCII_PIPE? m_bytes_written : 0; };
   int do_filesystem_checks() { return 0; }; // TODO: this need to be tidy up when to redo the file checking
 
-private:
+public:
   bool is_open() { return (m_fd >= 0); }
   void close_file();
 
@@ -179,27 +174,20 @@ private:
   static int writeln(char *data, int len, int fd, const char *path);
   void read_metadata();
 
-  //
-  // Write the given LogBuffer data onto this file and
-  // free the buffer memory according _need_delete_ parameter.
-  //
-  int write_and_try_delete(LogBuffer * lb, bool need_delete);
-
-private:
+public:
   LogFileFormat m_file_format;
   char *m_name;
   char *m_header;
   uint64_t m_signature;           // signature of log object stored
   MetaInfo *m_meta_info;
 
-  char *m_ascii_buffer;         // buffer for ascii output
   size_t m_ascii_buffer_size;   // size of ascii buffer
   size_t m_max_line_size;       // size of longest log line (record)
 
   int m_fd;
   long m_start_time;
   long m_end_time;
-  uint64_t m_bytes_written;
+  volatile uint64_t m_bytes_written;
   off_t m_size_bytes;           // current size of file in bytes
 
 public:

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogHost.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogHost.cc b/proxy/logging/LogHost.cc
index 1f9ff0d..54297f5 100644
--- a/proxy/logging/LogHost.cc
+++ b/proxy/logging/LogHost.cc
@@ -260,17 +260,17 @@ LogHost::create_orphan_LogFile_object()
 }
 
 //
-// write the buffer data to target host and try to
-// delete it when its reference become zero.
+// preprocess the given buffer data before sent to target host
+// and try to delete it when its reference become zero.
 //
-int
-LogHost::write_and_try_delete (LogBuffer *lb)
+void
+LogHost::preproc_and_try_delete (LogBuffer *lb)
 {
-  int result = -1;
+  int bytes;
 
   if (lb == NULL) {
     Note("Cannot write LogBuffer to LogHost %s; LogBuffer is NULL", name());
-    return -1;
+    return;
   }
   LogBufferHeader *buffer_header = lb->header();
   if (buffer_header == NULL) {
@@ -280,7 +280,6 @@ LogHost::write_and_try_delete (LogBuffer *lb)
   }
   if (buffer_header->entry_count == 0) {
     // no bytes to write
-    result = 0;
     goto done;
   }
 
@@ -291,8 +290,8 @@ LogHost::write_and_try_delete (LogBuffer *lb)
   if (!connected(NOPING)) {
     if (!connect ()) {
       Note("Cannot write LogBuffer to LogHost %s; not connected", name());
-      result = orphan_write(lb);
-      goto done;
+      orphan_write_and_try_delete(lb);
+      return;
     }
   }
 
@@ -308,16 +307,10 @@ LogHost::write_and_try_delete (LogBuffer *lb)
     disconnect();
     // TODO: We currently don't try to make the log buffers handle little vs big endian. TS-1156.
     // lb->convert_to_host_order ();
-    result = orphan_write(lb);
-    goto done;
+    orphan_write_and_try_delete(lb);
+    return;
   }
 
-  Debug("log-host","%d bytes sent to LogHost %s:%u", bytes_sent,
-      name(), port());
-  SUM_DYN_STAT (log_stat_bytes_sent_to_network_stat, bytes_sent);
-  result = bytes_sent;
-  goto done;
-
 #else // !defined(IOCORE_LOG_COLLATION)
   // create a new collation client if necessary
   if (m_log_collation_client_sm == NULL) {
@@ -326,33 +319,37 @@ LogHost::write_and_try_delete (LogBuffer *lb)
   }
 
   // send log_buffer; orphan if necessary
-  result = m_log_collation_client_sm->send(lb);
-  if (result <= 0) {
-    result = orphan_write(lb);
+  bytes = m_log_collation_client_sm->send(lb);
+  if (bytes <= 0) {
+    orphan_write_and_try_delete(lb);
 #if defined(LOG_BUFFER_TRACKING)
-    Debug("log-buftrak", "[%d]LogHost::write_and_try_delete - orphan write complete",
+    Debug("log-buftrak", "[%d]LogHost::preproc_and_try_delete - orphan write complete",
         lb->header()->id);
 #endif // defined(LOG_BUFFER_TRACKING)
-    goto done;
   }
 
-  return result;
-
+  return;
 #endif // !defined(IOCORE_LOG_COLLATION)
 
 done:
   LogBuffer::destroy(lb);
-  return result;
+  return;
 }
 
-int
-LogHost::orphan_write(LogBuffer * lb)
+//
+// write the given buffer data to orhpan file and
+// try to delete it when its reference become zero.
+//
+void
+LogHost::orphan_write_and_try_delete(LogBuffer * lb)
 {
   if (!Log::config->logging_space_exhausted) {
     Debug("log-host", "Sending LogBuffer to orphan file %s", m_orphan_file->get_name());
-    return m_orphan_file->write(lb);
+    m_orphan_file->preproc_and_try_delete(lb);
   } else {
-    return 0;                   // nothing written
+    Note("logging space exhausted, failed to write orphan file, drop(%" PRIu32 ") bytes",
+         lb->header()->byte_count);
+    LogBuffer::destroy(lb);
   }
 }
 
@@ -449,10 +446,9 @@ LogHostList::clear()
   }
 }
 
-int
-LogHostList::write_and_delete(LogBuffer * lb)
+void
+LogHostList::preproc_and_try_delete(LogBuffer * lb)
 {
-  int total_bytes = 0;
   unsigned nr_host, nr;
 
   ink_release_assert(lb->m_references == 0);
@@ -461,16 +457,12 @@ LogHostList::write_and_delete(LogBuffer * lb)
   ink_atomic_increment(&lb->m_references, nr_host);
 
   for (LogHost * host = first(); host && nr; host = next(host)) {
-    int bytes = host->write_and_try_delete(lb);
-    if (bytes > 0)
-      total_bytes += bytes;
+    host->preproc_and_try_delete(lb);
     nr--;
   }
 
   if (nr_host == 0)
     delete lb;
-
-  return total_bytes;
 }
 
 void

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogHost.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogHost.h b/proxy/logging/LogHost.h
index 201ca93..e8bec1e 100644
--- a/proxy/logging/LogHost.h
+++ b/proxy/logging/LogHost.h
@@ -53,10 +53,10 @@ public:
   bool connect();
   void disconnect();
   //
-  // write the buffer data to target host and try to
-  // delete it when its reference become zero.
+  // preprocess the given buffer data before sent to target host
+  // and try to delete it when its reference become zero.
   //
-  int write_and_try_delete(LogBuffer * lb);
+  void preproc_and_try_delete(LogBuffer * lb);
 
   char const* name() const { return m_name ? m_name : "UNKNOWN"; }
   IpAddr const& ip_addr() const { return m_ip; }
@@ -72,7 +72,11 @@ public:
 private:
   void clear();
   bool authenticated();
-  int orphan_write(LogBuffer * lb);
+  //
+  // write the given buffer data to orhpan file and
+  // try to delete it when its reference become zero.
+  //
+  void orphan_write_and_try_delete(LogBuffer * lb);
   void create_orphan_LogFile_object();
 
 private:
@@ -111,7 +115,7 @@ public:
   void add(LogHost * host, bool copy = true);
   unsigned count();
   void clear();
-  int write_and_delete(LogBuffer * lb);
+  void preproc_and_try_delete(LogBuffer * lb);
 
   LogHost *first() { return m_host_list.head; }
   LogHost *next(LogHost * here) { return (here->link).next; }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogObject.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
index b61eaa1..ea059e2 100644
--- a/proxy/logging/LogObject.cc
+++ b/proxy/logging/LogObject.cc
@@ -40,7 +40,7 @@
 #include "LogObject.h"
 
 size_t
-LogBufferManager::flush_buffers(LogBufferSink *sink) {
+LogBufferManager::preproc_buffers(LogBufferSink *sink) {
   SList(LogBuffer, write_link) q(write_list.popall()), new_q;
   LogBuffer *b = NULL;
   while ((b = q.pop())) {
@@ -56,16 +56,16 @@ LogBufferManager::flush_buffers(LogBufferSink *sink) {
     }
   }
 
-  int flushed = 0;
+  int prepared = 0;
   while ((b = new_q.pop())) {
     b->update_header_data();
-    sink->write_and_delete(b);
+    sink->preproc_and_try_delete(b);
     ink_atomic_increment(&_num_flush_buffers, -1);
-    flushed++;
+    prepared++;
   }
 
-  Debug("log-logbuffer", "flushed %d buffers", flushed);
-  return flushed;
+  Debug("log-logbuffer", "prepared %d buffers", prepared);
+  return prepared;
 }
 
 /*-------------------------------------------------------------------------
@@ -73,20 +73,24 @@ LogBufferManager::flush_buffers(LogBufferSink *sink) {
   -------------------------------------------------------------------------*/
 
 LogObject::LogObject(LogFormat *format, const char *log_dir,
-                      const char *basename, LogFileFormat file_format,
-                      const char *header, int rolling_enabled,
-                      int rolling_interval_sec, int rolling_offset_hr, int rolling_size_mb)
-  : m_alt_filename (NULL),
-    m_flags (0),
-    m_signature (0),
-    m_rolling_interval_sec (rolling_interval_sec),
-    m_rolling_offset_hr (rolling_offset_hr),
-    m_rolling_size_mb (rolling_size_mb),
-    m_last_roll_time(0),
-    m_ref_count (0)
+                     const char *basename, LogFileFormat file_format,
+                     const char *header, int rolling_enabled,
+                     int flush_threads, int rolling_interval_sec,
+                     int rolling_offset_hr, int rolling_size_mb):
+      m_alt_filename (NULL),
+      m_flags (0),
+      m_signature (0),
+      m_flush_threads (flush_threads),
+      m_rolling_interval_sec (rolling_interval_sec),
+      m_rolling_offset_hr (rolling_offset_hr),
+      m_rolling_size_mb (rolling_size_mb),
+      m_last_roll_time(0),
+      m_ref_count (0),
+      m_buffer_manager_idx(0)
 {
     ink_assert (format != NULL);
     m_format = new LogFormat(*format);
+    m_buffer_manager = new LogBufferManager[m_flush_threads];
 
     if (file_format == BINARY_LOG) {
         m_flags |= BINARY;
@@ -130,11 +134,13 @@ LogObject::LogObject(LogObject& rhs)
     m_alt_filename(ats_strdup(rhs.m_alt_filename)),
     m_flags(rhs.m_flags),
     m_signature(rhs.m_signature),
+    m_flush_threads(rhs.m_flush_threads),
     m_rolling_interval_sec(rhs.m_rolling_interval_sec),
     m_last_roll_time(rhs.m_last_roll_time),
     m_ref_count(0)
 {
     m_format = new LogFormat(*(rhs.m_format));
+    m_buffer_manager = new LogBufferManager[m_flush_threads];
 
     if (rhs.m_logFile) {
         m_logFile = NEW (new LogFile(*(rhs.m_logFile)));
@@ -172,7 +178,7 @@ LogObject::~LogObject()
     Debug("log-config", "LogObject refcount = %d, waiting for zero", m_ref_count);
   }
 
-  flush_buffers();
+  preproc_buffers();
 
   // here we need to free LogHost if it is remote logging.
   if (is_collation_client()) {
@@ -185,6 +191,7 @@ LogObject::~LogObject()
   ats_free(m_filename);
   ats_free(m_alt_filename);
   delete m_format;
+  delete[] m_buffer_manager;
   delete (LogBuffer*)FREELIST_POINTER(m_log_buffer);
 }
 
@@ -451,9 +458,10 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) {
       if (FREELIST_POINTER(old_h) == FREELIST_POINTER(h)) {
         ink_atomic_increment(&buffer->m_references, FREELIST_VERSION(old_h) - 1);
 
+        int idx = m_buffer_manager_idx++ % m_flush_threads;
         Debug("log-logbuffer", "adding buffer %d to flush list after checkout", buffer->get_id());
-        m_buffer_manager.add_to_flush_queue(buffer);
-        ink_cond_signal(&Log::flush_cond);
+        m_buffer_manager[idx].add_to_flush_queue(buffer);
+        ink_cond_signal(&Log::preproc_cond[idx]);
 
       }
       decremented = true;
@@ -798,10 +806,14 @@ LogObject::do_filesystem_checks()
 /*-------------------------------------------------------------------------
   TextLogObject::TextLogObject
   -------------------------------------------------------------------------*/
-TextLogObject::TextLogObject(const char *name, const char *log_dir, bool timestamps, const char *header, int rolling_enabled,
-                             int rolling_interval_sec, int rolling_offset_hr, int rolling_size_mb)
+TextLogObject::TextLogObject(const char *name, const char *log_dir,
+                             bool timestamps, const char *header,
+                             int rolling_enabled, int flush_threads,
+                             int rolling_interval_sec, int rolling_offset_hr,
+                             int rolling_size_mb)
   : LogObject(NEW(new LogFormat(TEXT_LOG)), log_dir, name, ASCII_LOG, header,
-              rolling_enabled, rolling_interval_sec, rolling_offset_hr, rolling_size_mb), m_timestamps(timestamps)
+              rolling_enabled, flush_threads, rolling_interval_sec,
+              rolling_offset_hr, rolling_size_mb), m_timestamps(timestamps)
 {
 }
 
@@ -1155,19 +1167,19 @@ LogObjectManager::check_buffer_expiration(long time_now)
   }
 }
 
-size_t LogObjectManager::flush_buffers()
+size_t LogObjectManager::preproc_buffers(int idx)
 {
   size_t i;
-  size_t buffers_flushed = 0;
+  size_t buffers_preproced = 0;
 
   for (i = 0; i < _numObjects; i++) {
-    buffers_flushed += _objects[i]->flush_buffers();
+    buffers_preproced += _objects[i]->preproc_buffers(idx);
   }
 
   for (i = 0; i < _numAPIobjects; i++) {
-      buffers_flushed += _APIobjects[i]->flush_buffers();
+      buffers_preproced += _APIobjects[i]->preproc_buffers(idx);
   }
-  return buffers_flushed;
+  return buffers_preproced;
 }
 
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/LogObject.h
----------------------------------------------------------------------
diff --git a/proxy/logging/LogObject.h b/proxy/logging/LogObject.h
index 8e96c72..b296f81 100644
--- a/proxy/logging/LogObject.h
+++ b/proxy/logging/LogObject.h
@@ -75,11 +75,11 @@ class LogBufferManager
     LogBufferManager() : _num_flush_buffers(0) { }
 
     void add_to_flush_queue(LogBuffer *buffer) {
-    write_list.push(buffer);
-    ink_atomic_increment(&_num_flush_buffers, 1);
+      write_list.push(buffer);
+      ink_atomic_increment(&_num_flush_buffers, 1);
     }
 
-    size_t flush_buffers(LogBufferSink *sink);
+    size_t preproc_buffers(LogBufferSink *sink);
 };
 
 class LogObject
@@ -99,8 +99,9 @@ public:
 
   LogObject(LogFormat *format, const char *log_dir, const char *basename,
                  LogFileFormat file_format, const char *header,
-                 int rolling_enabled, int rolling_interval_sec = 0,
-                 int rolling_offset_hr = 0, int rolling_size_mb = 0);
+                 int rolling_enabled, int flush_threads,
+                 int rolling_interval_sec = 0, int rolling_offset_hr = 0,
+                 int rolling_size_mb = 0);
   LogObject(LogObject &);
   virtual ~LogObject();
 
@@ -114,16 +115,26 @@ public:
 
   int roll_files(long time_now = 0);
 
-  void add_to_flush_queue(LogBuffer * buffer) { m_buffer_manager.add_to_flush_queue(buffer); }
+  int add_to_flush_queue(LogBuffer * buffer)
+  {
+    int idx = m_buffer_manager_idx++ % m_flush_threads;
+
+    m_buffer_manager[idx].add_to_flush_queue(buffer);
 
-  size_t flush_buffers()
+    return idx;
+  }
+
+  size_t preproc_buffers(int idx = -1)
   {
     size_t nfb;
 
+    if (idx == -1)
+      idx = m_buffer_manager_idx++ % m_flush_threads;
+
     if (m_logFile) {
-      nfb = m_buffer_manager.flush_buffers(m_logFile);
+      nfb = m_buffer_manager[idx].preproc_buffers(m_logFile);
     } else {
-      nfb = m_buffer_manager.flush_buffers(&m_host_list);
+      nfb = m_buffer_manager[idx].preproc_buffers(&m_host_list);
     }
     return nfb;
   }
@@ -204,9 +215,10 @@ private:
   // name conflicts
 
   unsigned int m_flags;         // diverse object flags (see above)
-  uint64_t m_signature;           // INK_MD5 signature for object
+  uint64_t m_signature;         // INK_MD5 signature for object
 
   int m_rolling_enabled;
+  int m_flush_threads;          // number of flush threads
   int m_rolling_interval_sec;   // time interval between rolls
   // 0 means no rolling
   int m_rolling_offset_hr;      //
@@ -217,7 +229,8 @@ private:
   int m_ref_count;
 
   volatile head_p m_log_buffer;     // current work buffer
-  LogBufferManager m_buffer_manager;
+  unsigned m_buffer_manager_idx;
+  LogBufferManager *m_buffer_manager;
 
   void generate_filenames(const char *log_dir, const char *basename, LogFileFormat file_format);
   void _setup_rolling(int rolling_enabled, int rolling_interval_sec, int rolling_offset_hr, int rolling_size_mb);
@@ -240,8 +253,10 @@ class TextLogObject:public LogObject
 public:
   inkcoreapi TextLogObject(const char *name, const char *log_dir,
                            bool timestamps, const char *header,
-                           int rolling_enabled, int rolling_interval_sec = 0,
-                           int rolling_offset_hr = 0, int rolling_size_mb = 0);
+                           int rolling_enabled, int flush_threads,
+                           int rolling_interval_sec = 0,
+                           int rolling_offset_hr = 0,
+                           int rolling_size_mb = 0);
 
   inkcoreapi int write(const char *format, ...);
   inkcoreapi int va_write(const char *format, va_list ap);
@@ -365,7 +380,7 @@ public:
   void display(FILE * str = stdout);
   void add_filter_to_all(LogFilter * filter);
   LogObject *find_by_format_name(const char *name);
-  size_t flush_buffers();
+  size_t preproc_buffers(int idx);
   void open_local_pipes();
   void transfer_objects(LogObjectManager & mgr);
 


Re: git commit: TS-2089: introduce configurable collation preproc threads

Posted by Igor Galić <i....@brainsware.org>.
> 
> >  /*-------------------------------------------------------------------------
> > +<<<<<<< HEAD
> >    LogFile::write_and_try_delete
> > +=======
> > +  LogFile::preproc_and_try_delete
> > +
> > +  preprocess the given buffer data before write to target file
> > +  and try to delete it when its reference become zero.
> > +>>>>>>> TS-2089: introduce configurable collation preproc threads
> 
> 
> To reiterate a point Leif made recently about one of *my* commits:
> We should make sure that every single commit actually compiles..
> 

Looking at this a second time, I realize this was a in a comment,
so I feel a little foolish now, but we still might want to fix that. 

-- 
Igor Galić

Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE


Re: git commit: TS-2089: introduce configurable collation preproc threads

Posted by Yunkai Zhang <yu...@gmail.com>.
On Fri, Aug 16, 2013 at 6:11 PM, Igor Galić <i....@brainsware.org> wrote:

>
>
> ----- Original Message -----
> > Updated Branches:
> >   refs/heads/master e0ec5304d -> 8a1112881
> >
> >
> > TS-2089: introduce configurable collation preproc threads
> >
> > We found that CPU of logging thread could be easy to reach up 100% in
> > collation host, but disk IO was low at the same time.
> >
> > The bottleneck of logging thread is that some preprocessing job, such as
> > convert LogBuffer to ascii text, consume so much CPU time. And more
> > worse, the write() operation will block logging thread.
> >
> > So this patch try to split logging thread into two parts:
> > 1) Configurable preproc threads, which are responsiable for processing
> all
> >    of prepare work, and then forward the preprocessed buffer to flush
> thread,
> >    or send them to CollationClient/HostSM.
> >
> > 2) One Flush thread, it will consume preprocessed buffers and write them
> to
> >    disk. In our testing, one flush thread is enough for us.
> >
> > TODO: This patch supports only one flush thread, we can improve it to
> >       "one flush thread per file/disk" in the future.
> >
> > == How to configure ==
> > The number of preproc threads is 1 by default.
> >
> > Please modify "proxy.config.log.collation_preproc_threads" option to
> > change it.
>
>
> First off: I *love* your commit messages.
>
> > Signed-off-by: Yunkai Zhang <qi...@taobao.com>
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
> > Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
> > Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288
> >
> > Branch: refs/heads/master
> > Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
> > Parents: e0ec530
> > Author: Yunkai Zhang <qi...@taobao.com>
> > Authored: Sun Aug 11 16:42:32 2013 +0800
> > Committer: Yunkai Zhang <qi...@taobao.com>
> > Committed: Fri Aug 16 10:41:16 2013 +0800
> >
> > ----------------------------------------------------------------------
> >  CHANGES                               |   3 +
> >  mgmt/RecordsConfig.cc                 |   2 +
> >  mgmt/cli/ShowCmd.cc                   |   3 +
> >  proxy/logging/Log.cc                  | 257
> ++++++++++++++++++++++++-----
> >  proxy/logging/Log.h                   |  46 +++++-
> >  proxy/logging/LogBufferSink.h         |   9 +-
> >  proxy/logging/LogCollationClientSM.cc |   6 +-
> >  proxy/logging/LogCollationHostSM.cc   |   4 +-
> >  proxy/logging/LogConfig.cc            |  19 ++-
> >  proxy/logging/LogConfig.h             |   1 +
> >  proxy/logging/LogFile.cc              | 121 ++++++--------
> >  proxy/logging/LogFile.h               |  20 +--
> >  proxy/logging/LogHost.cc              |  66 ++++----
> >  proxy/logging/LogHost.h               |  14 +-
> >  proxy/logging/LogObject.cc            |  68 ++++----
> >  proxy/logging/LogObject.h             |  43 +++--
> >  16 files changed, 450 insertions(+), 232 deletions(-)
> > ----------------------------------------------------------------------
>
> I'm missing a change to doc/reference/configuration/records.config.en.rst
> We should make it a habit of adding documentation in the same commit as
> new records.config changes.
>

I didn't notice "records.config.en.rst" file, sorry.

I'll sync it later.



>
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
> > ----------------------------------------------------------------------
> > diff --git a/CHANGES b/CHANGES
> > index 6b9a3bb..c613f00 100644
> > --- a/CHANGES
> > +++ b/CHANGES
> > @@ -1,6 +1,9 @@
> >                                                           -*- coding:
> utf-8
> >                                                           -*-
> >  Changes with Apache Traffic Server 3.5.0
> >
> > +
> > +  *) TS-2089: introduce configurable collation preproc threads
> > +
> >    *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
> >     needlessly chowned to to ATS' user.
> >     Author: Tomasz Kuzemko <to...@kuzemko.net>
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
> > ----------------------------------------------------------------------
> > diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> > index cfc2267..9402581 100644
> > --- a/mgmt/RecordsConfig.cc
> > +++ b/mgmt/RecordsConfig.cc
> > @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
> >    ,
> >    {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT,
> >    "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
> >    ,
> > +  {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT,
> "1",
> > RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
> > +  ,
> >    {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1",
> >    RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
> >    ,
> >    {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT,
> "86400",
> >    RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
> > ----------------------------------------------------------------------
> > diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
> > index 0798c43..ed71560 100644
> > --- a/mgmt/cli/ShowCmd.cc
> > +++ b/mgmt/cli/ShowCmd.cc
> > @@ -1555,6 +1555,7 @@ ShowLogging()
> >    TSInt collation_port = -1;
> >    TSString collation_secret = NULL;
> >    TSInt host_tag = 0;
> > +  TSInt preproc_threads = 0;
> >    TSInt orphan_space = -1;
> >
> >    TSInt squid_log = 0;
> > @@ -1596,6 +1597,7 @@ ShowLogging()
> >    Cli_RecordGetString("proxy.config.log.collation_secret",
> >    &collation_secret);
> >    Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
> >    Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs",
> >    &orphan_space);
> > +  Cli_RecordGetInt("proxy.config.log.collation_preproc_threads",
> > &preproc_threads);
> >
> >    Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
> >    Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
> > @@ -1657,6 +1659,7 @@ ShowLogging()
> >    Cli_Printf("  Port ----------------------------------- %d\n",
> >    collation_port);
> >    Cli_Printf("  Secret --------------------------------- %s\n",
> >    collation_secret);
> >    Cli_PrintEnable("  Host Tagged ---------------------------- ",
> host_tag);
> > +  Cli_PrintEnable("  Preproc Threads ------------------------ ",
> > preproc_threads);
> >    Cli_Printf("  Space Limit for Orphan Files ----------- %d MB\n",
> >    orphan_space);
> >
> >    Cli_PrintEnable("\nSquid Format ----------------------------- ",
> >    squid_log);
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> > index 94d5625..1361978 100644
> > --- a/proxy/logging/Log.cc
> > +++ b/proxy/logging/Log.cc
> > @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
> >  size_t Log::maxInactiveObjects;
> >
> >  // Flush thread stuff
> > -volatile unsigned long Log::flush_counter = 0;
>
> This bit is fascinating: We're replacing the (seemingly)
> unused volatile variable flush_counter with the already
> existing variable m_bytes_written..
>
> We turn that variable into a volatile, and continue to
> use it as if nothing happened. But here's the question:
> why was flush_counter not used?
>

Log::flush_counter is unused in the original code. I don't know why it
exists. It seems a deprecated code.

Why change LogFile.m_bytes_written to volatile is that now we have multiple
preproc threads and one flush thread, those threads may change
m_bytes_written concurrently.


> > -ink_mutex Log::flush_mutex;
> > -ink_cond Log::flush_cond;
> > -ink_thread Log::flush_thread;
> > +ink_mutex *Log::preproc_mutex;
> > +ink_cond *Log::preproc_cond;
> > +ink_mutex *Log::flush_mutex;
> > +ink_cond *Log::flush_cond;
> > +InkAtomicList *Log::flush_data_list;
> >
> >  // Collate thread stuff
> >  ink_mutex Log::collate_mutex;
> >  ink_cond Log::collate_cond;
> >  ink_thread Log::collate_thread;
> >  int Log::collation_accept_file_descriptor;
> > +int Log::collation_preproc_threads;
> >  int Log::collation_port;
> >
> >  // Log private objects
> > @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
> >
> >  struct PeriodicWakeup;
> >  typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
> > -struct PeriodicWakeup : Continuation {
> > +struct PeriodicWakeup : Continuation
> > +{
> > +  int m_preproc_threads;
> > +  int m_flush_threads;
> > +
> >    int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
> >    {
> > -    ink_cond_signal (&Log::flush_cond);
> > -    return EVENT_CONT;
> > +      for (int i = 0; i < m_preproc_threads; i++) {
> > +        ink_cond_signal (&Log::preproc_cond[i]);
> > +      }
> > +      for (int i = 0; i < m_flush_threads; i++) {
> > +        ink_cond_signal (&Log::flush_cond[i]);
> > +      }
> > +      return EVENT_CONT;
> >    }
> >
> > -  PeriodicWakeup () : Continuation (new_ProxyMutex())
> > +  PeriodicWakeup (int preproc_threads, int flush_threads) :
> > +    Continuation (new_ProxyMutex()),
> > +    m_preproc_threads(preproc_threads),
> > +    m_flush_threads(flush_threads)
> >    {
> > -    SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> > +      SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> >    }
> >  };
> >
> > @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
> >
>  /*-------------------------------------------------------------------------
> >    MAIN INTERFACE
> >
>  -------------------------------------------------------------------------*/
> > +struct LoggingPreprocContinuation: public Continuation
> > +{
> > +  int m_idx;
> > +
> > +  int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED
> */)
> > +  {
> > +    Log::preproc_thread_main((void *)&m_idx);
> > +    return 0;
> > +  }
> > +
> > +  LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
> > +  {
> > +    SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
> > +  }
> > +};
> > +
> >  struct LoggingFlushContinuation: public Continuation
> >  {
> > +  int m_idx;
> > +
> >    int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED
> */)
> >    {
> > -    Log::flush_thread_main(NULL);
> > +    Log::flush_thread_main((void *)&m_idx);
> >      return 0;
> >    }
> >
> > -  LoggingFlushContinuation():Continuation(NULL)
> > +  LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
> >    {
> >      SET_HANDLER(&LoggingFlushContinuation::mainEvent);
> >    }
> > @@ -910,6 +942,7 @@ Log::init(int flags)
> >    numInactiveObjects = 0;
> >    inactive_objects = new LogObject*[maxInactiveObjects];
> >
> > +  collation_preproc_threads = 1;
> >    collation_accept_file_descriptor = NO_FD;
> >
> >    // store the configuration flags
> > @@ -931,6 +964,7 @@ Log::init(int flags)
> >
> >      config->read_configuration_variables();
> >      collation_port = config->collation_port;
> > +    collation_preproc_threads = config->collation_preproc_threads;
> >
> >      if (config_flags & STANDALONE_COLLATOR) {
> >        logging_mode = LOG_TRANSACTIONS_ONLY;
> > @@ -959,8 +993,8 @@ Log::init(int flags)
> >      create_threads();
> >
> >  #ifndef INK_SINGLE_THREADED
> > -    eventProcessor.schedule_every(NEW (new PeriodicWakeup()),
> HRTIME_SECOND,
> > -        ET_CALL);
> > +    eventProcessor.schedule_every(NEW (new
> > PeriodicWakeup(collation_preproc_threads, 1)),
> > +                                  HRTIME_SECOND, ET_CALL);
> >  #endif
> >      init_status |= PERIODIC_WAKEUP_SCHEDULED;
> >
> > @@ -1001,9 +1035,16 @@ Log::init_when_enabled()
> >      // setup global scrap object
> >      //
> >      global_scrap_format = NEW(new LogFormat(TEXT_LOG));
> > -    global_scrap_object = NEW(new LogObject(global_scrap_format,
> > Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
> > -                                            NULL,
> > Log::config->rolling_enabled, Log::config->rolling_interval_sec,
> > -
>  Log::config->rolling_offset_hr,
> > Log::config->rolling_size_mb));
> > +    global_scrap_object =
> > +      NEW(new LogObject(global_scrap_format,
> > +                        Log::config->logfile_dir,
> > +                        "scrapfile.log",
> > +                        BINARY_LOG, NULL,
> > +                        Log::config->rolling_enabled,
> > +                        Log::config->collation_preproc_threads,
> > +                        Log::config->rolling_interval_sec,
> > +                        Log::config->rolling_offset_hr,
> > +                        Log::config->rolling_size_mb));
> >
> >      // create the flush thread and the collation thread
> >      //
> > @@ -1030,15 +1071,43 @@ Log::create_threads()
> >
> >    REC_ReadConfigInteger(stacksize,
> "proxy.config.thread.default.stacksize");
> >    if (!(init_status & THREADS_CREATED)) {
> > -    // start the flush thread
> > +
> > +    char desc[64];
> > +    preproc_mutex = new ink_mutex[collation_preproc_threads];
> > +    preproc_cond = new ink_cond[collation_preproc_threads];
> > +
> > +    size_t stacksize;
> > +    REC_ReadConfigInteger(stacksize,
> "proxy.config.thread.default.stacksize");
>
> This ^ seems like an unnecessary step, that could be pushed into ...
>
> > +
> > +    // start the preproc threads
> >      //
> >      // no need for the conditional var since it will be relying on
> >      // on the event system.
> > -    ink_mutex_init(&flush_mutex, "Flush thread mutex");
> > -    ink_cond_init(&flush_cond);
> > -    Continuation *flush_continuation = NEW(new
> LoggingFlushContinuation);
> > -    Event *flush_event = eventProcessor.spawn_thread(flush_continuation,
> > "[LOGGING]", stacksize);
> > -    flush_thread = flush_event->ethread->tid;
> > +    for (int i = 0; i < collation_preproc_threads; i++) {
> > +      sprintf(desc, "Logging preproc thread mutex[%d]", i);
> > +      ink_mutex_init(&preproc_mutex[i], desc);
> > +      ink_cond_init(&preproc_cond[i]);
> > +      Continuation *preproc_cont = NEW(new
> LoggingPreprocContinuation(i));
> > +      sprintf(desc, "[LOG_PREPROC %d]", i);
> > +      eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
>
> spawn_thread() as a default.. i.e.:
>
> eventProcessor::spawn_thread(Continuation, const char * desc, int
> stacksize -1)
> {
>   size_t thread_stacksize;
>   if stacksize -1 {
>     REC_ReadConfigInteger(thread_stacksize,
> "proxy.config.thread.default.stacksize")
>   } else {
>      thread_stacksize = stacksize
>   }
>
> }
>

Yes, agree. I think it's better to give another patch to improve
spawn_thread().


>
>
>
>
> >
>  /*-------------------------------------------------------------------------
> > +<<<<<<< HEAD
> >    LogFile::write_and_try_delete
> > +=======
> > +  LogFile::preproc_and_try_delete
> > +
> > +  preprocess the given buffer data before write to target file
> > +  and try to delete it when its reference become zero.
> > +>>>>>>> TS-2089: introduce configurable collation preproc threads
>
>
> To reiterate a point Leif made recently about one of *my* commits:
> We should make sure that every single commit actually compiles..
>

Yes, it's comments:D

Thanks for your reviewing.


>
> Anyway, that's all I have for now.


> -- i
> Igor Galić
>
> Tel: +43 (0) 664 886 22 883
> Mail: i.galic@brainsware.org
> URL: http://brainsware.org/
> GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE
>



-- 
Yunkai Zhang
Work at Taobao

Re: git commit: TS-2089: introduce configurable collation preproc threads

Posted by Igor Galić <i....@brainsware.org>.

----- Original Message -----
> Updated Branches:
>   refs/heads/master e0ec5304d -> 8a1112881
> 
> 
> TS-2089: introduce configurable collation preproc threads
> 
> We found that CPU of logging thread could be easy to reach up 100% in
> collation host, but disk IO was low at the same time.
> 
> The bottleneck of logging thread is that some preprocessing job, such as
> convert LogBuffer to ascii text, consume so much CPU time. And more
> worse, the write() operation will block logging thread.
> 
> So this patch try to split logging thread into two parts:
> 1) Configurable preproc threads, which are responsiable for processing all
>    of prepare work, and then forward the preprocessed buffer to flush thread,
>    or send them to CollationClient/HostSM.
> 
> 2) One Flush thread, it will consume preprocessed buffers and write them to
>    disk. In our testing, one flush thread is enough for us.
> 
> TODO: This patch supports only one flush thread, we can improve it to
>       "one flush thread per file/disk" in the future.
> 
> == How to configure ==
> The number of preproc threads is 1 by default.
> 
> Please modify "proxy.config.log.collation_preproc_threads" option to
> change it.


First off: I *love* your commit messages.

> Signed-off-by: Yunkai Zhang <qi...@taobao.com>
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288
> 
> Branch: refs/heads/master
> Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
> Parents: e0ec530
> Author: Yunkai Zhang <qi...@taobao.com>
> Authored: Sun Aug 11 16:42:32 2013 +0800
> Committer: Yunkai Zhang <qi...@taobao.com>
> Committed: Fri Aug 16 10:41:16 2013 +0800
> 
> ----------------------------------------------------------------------
>  CHANGES                               |   3 +
>  mgmt/RecordsConfig.cc                 |   2 +
>  mgmt/cli/ShowCmd.cc                   |   3 +
>  proxy/logging/Log.cc                  | 257 ++++++++++++++++++++++++-----
>  proxy/logging/Log.h                   |  46 +++++-
>  proxy/logging/LogBufferSink.h         |   9 +-
>  proxy/logging/LogCollationClientSM.cc |   6 +-
>  proxy/logging/LogCollationHostSM.cc   |   4 +-
>  proxy/logging/LogConfig.cc            |  19 ++-
>  proxy/logging/LogConfig.h             |   1 +
>  proxy/logging/LogFile.cc              | 121 ++++++--------
>  proxy/logging/LogFile.h               |  20 +--
>  proxy/logging/LogHost.cc              |  66 ++++----
>  proxy/logging/LogHost.h               |  14 +-
>  proxy/logging/LogObject.cc            |  68 ++++----
>  proxy/logging/LogObject.h             |  43 +++--
>  16 files changed, 450 insertions(+), 232 deletions(-)
> ----------------------------------------------------------------------

I'm missing a change to doc/reference/configuration/records.config.en.rst
We should make it a habit of adding documentation in the same commit as
new records.config changes.

> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
> ----------------------------------------------------------------------
> diff --git a/CHANGES b/CHANGES
> index 6b9a3bb..c613f00 100644
> --- a/CHANGES
> +++ b/CHANGES
> @@ -1,6 +1,9 @@
>                                                           -*- coding: utf-8
>                                                           -*-
>  Changes with Apache Traffic Server 3.5.0
>  
> +
> +  *) TS-2089: introduce configurable collation preproc threads
> +
>    *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
>     needlessly chowned to to ATS' user.
>     Author: Tomasz Kuzemko <to...@kuzemko.net>
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> index cfc2267..9402581 100644
> --- a/mgmt/RecordsConfig.cc
> +++ b/mgmt/RecordsConfig.cc
> @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
>    ,
>    {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT,
>    "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>    ,
> +  {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT, "1",
> RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
> +  ,
>    {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1",
>    RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
>    ,
>    {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT, "86400",
>    RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
> index 0798c43..ed71560 100644
> --- a/mgmt/cli/ShowCmd.cc
> +++ b/mgmt/cli/ShowCmd.cc
> @@ -1555,6 +1555,7 @@ ShowLogging()
>    TSInt collation_port = -1;
>    TSString collation_secret = NULL;
>    TSInt host_tag = 0;
> +  TSInt preproc_threads = 0;
>    TSInt orphan_space = -1;
>  
>    TSInt squid_log = 0;
> @@ -1596,6 +1597,7 @@ ShowLogging()
>    Cli_RecordGetString("proxy.config.log.collation_secret",
>    &collation_secret);
>    Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
>    Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs",
>    &orphan_space);
> +  Cli_RecordGetInt("proxy.config.log.collation_preproc_threads",
> &preproc_threads);
>  
>    Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
>    Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
> @@ -1657,6 +1659,7 @@ ShowLogging()
>    Cli_Printf("  Port ----------------------------------- %d\n",
>    collation_port);
>    Cli_Printf("  Secret --------------------------------- %s\n",
>    collation_secret);
>    Cli_PrintEnable("  Host Tagged ---------------------------- ", host_tag);
> +  Cli_PrintEnable("  Preproc Threads ------------------------ ",
> preproc_threads);
>    Cli_Printf("  Space Limit for Orphan Files ----------- %d MB\n",
>    orphan_space);
>  
>    Cli_PrintEnable("\nSquid Format ----------------------------- ",
>    squid_log);
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> index 94d5625..1361978 100644
> --- a/proxy/logging/Log.cc
> +++ b/proxy/logging/Log.cc
> @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
>  size_t Log::maxInactiveObjects;
>  
>  // Flush thread stuff
> -volatile unsigned long Log::flush_counter = 0;

This bit is fascinating: We're replacing the (seemingly)
unused volatile variable flush_counter with the already
existing variable m_bytes_written..

We turn that variable into a volatile, and continue to
use it as if nothing happened. But here's the question:
why was flush_counter not used?

> -ink_mutex Log::flush_mutex;
> -ink_cond Log::flush_cond;
> -ink_thread Log::flush_thread;
> +ink_mutex *Log::preproc_mutex;
> +ink_cond *Log::preproc_cond;
> +ink_mutex *Log::flush_mutex;
> +ink_cond *Log::flush_cond;
> +InkAtomicList *Log::flush_data_list;
>  
>  // Collate thread stuff
>  ink_mutex Log::collate_mutex;
>  ink_cond Log::collate_cond;
>  ink_thread Log::collate_thread;
>  int Log::collation_accept_file_descriptor;
> +int Log::collation_preproc_threads;
>  int Log::collation_port;
>  
>  // Log private objects
> @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
>  
>  struct PeriodicWakeup;
>  typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
> -struct PeriodicWakeup : Continuation {
> +struct PeriodicWakeup : Continuation
> +{
> +  int m_preproc_threads;
> +  int m_flush_threads;
> +
>    int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
>    {
> -    ink_cond_signal (&Log::flush_cond);
> -    return EVENT_CONT;
> +      for (int i = 0; i < m_preproc_threads; i++) {
> +        ink_cond_signal (&Log::preproc_cond[i]);
> +      }
> +      for (int i = 0; i < m_flush_threads; i++) {
> +        ink_cond_signal (&Log::flush_cond[i]);
> +      }
> +      return EVENT_CONT;
>    }
>  
> -  PeriodicWakeup () : Continuation (new_ProxyMutex())
> +  PeriodicWakeup (int preproc_threads, int flush_threads) :
> +    Continuation (new_ProxyMutex()),
> +    m_preproc_threads(preproc_threads),
> +    m_flush_threads(flush_threads)
>    {
> -    SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> +      SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
>    }
>  };
>  
> @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
>  /*-------------------------------------------------------------------------
>    MAIN INTERFACE
>    -------------------------------------------------------------------------*/
> +struct LoggingPreprocContinuation: public Continuation
> +{
> +  int m_idx;
> +
> +  int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
> +  {
> +    Log::preproc_thread_main((void *)&m_idx);
> +    return 0;
> +  }
> +
> +  LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
> +  {
> +    SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
> +  }
> +};
> +
>  struct LoggingFlushContinuation: public Continuation
>  {
> +  int m_idx;
> +
>    int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
>    {
> -    Log::flush_thread_main(NULL);
> +    Log::flush_thread_main((void *)&m_idx);
>      return 0;
>    }
>  
> -  LoggingFlushContinuation():Continuation(NULL)
> +  LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
>    {
>      SET_HANDLER(&LoggingFlushContinuation::mainEvent);
>    }
> @@ -910,6 +942,7 @@ Log::init(int flags)
>    numInactiveObjects = 0;
>    inactive_objects = new LogObject*[maxInactiveObjects];
>  
> +  collation_preproc_threads = 1;
>    collation_accept_file_descriptor = NO_FD;
>  
>    // store the configuration flags
> @@ -931,6 +964,7 @@ Log::init(int flags)
>  
>      config->read_configuration_variables();
>      collation_port = config->collation_port;
> +    collation_preproc_threads = config->collation_preproc_threads;
>  
>      if (config_flags & STANDALONE_COLLATOR) {
>        logging_mode = LOG_TRANSACTIONS_ONLY;
> @@ -959,8 +993,8 @@ Log::init(int flags)
>      create_threads();
>  
>  #ifndef INK_SINGLE_THREADED
> -    eventProcessor.schedule_every(NEW (new PeriodicWakeup()), HRTIME_SECOND,
> -        ET_CALL);
> +    eventProcessor.schedule_every(NEW (new
> PeriodicWakeup(collation_preproc_threads, 1)),
> +                                  HRTIME_SECOND, ET_CALL);
>  #endif
>      init_status |= PERIODIC_WAKEUP_SCHEDULED;
>  
> @@ -1001,9 +1035,16 @@ Log::init_when_enabled()
>      // setup global scrap object
>      //
>      global_scrap_format = NEW(new LogFormat(TEXT_LOG));
> -    global_scrap_object = NEW(new LogObject(global_scrap_format,
> Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
> -                                            NULL,
> Log::config->rolling_enabled, Log::config->rolling_interval_sec,
> -                                            Log::config->rolling_offset_hr,
> Log::config->rolling_size_mb));
> +    global_scrap_object =
> +      NEW(new LogObject(global_scrap_format,
> +                        Log::config->logfile_dir,
> +                        "scrapfile.log",
> +                        BINARY_LOG, NULL,
> +                        Log::config->rolling_enabled,
> +                        Log::config->collation_preproc_threads,
> +                        Log::config->rolling_interval_sec,
> +                        Log::config->rolling_offset_hr,
> +                        Log::config->rolling_size_mb));
>  
>      // create the flush thread and the collation thread
>      //
> @@ -1030,15 +1071,43 @@ Log::create_threads()
>  
>    REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
>    if (!(init_status & THREADS_CREATED)) {
> -    // start the flush thread
> +
> +    char desc[64];
> +    preproc_mutex = new ink_mutex[collation_preproc_threads];
> +    preproc_cond = new ink_cond[collation_preproc_threads];
> +
> +    size_t stacksize;
> +    REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");

This ^ seems like an unnecessary step, that could be pushed into ...

> +
> +    // start the preproc threads
>      //
>      // no need for the conditional var since it will be relying on
>      // on the event system.
> -    ink_mutex_init(&flush_mutex, "Flush thread mutex");
> -    ink_cond_init(&flush_cond);
> -    Continuation *flush_continuation = NEW(new LoggingFlushContinuation);
> -    Event *flush_event = eventProcessor.spawn_thread(flush_continuation,
> "[LOGGING]", stacksize);
> -    flush_thread = flush_event->ethread->tid;
> +    for (int i = 0; i < collation_preproc_threads; i++) {
> +      sprintf(desc, "Logging preproc thread mutex[%d]", i);
> +      ink_mutex_init(&preproc_mutex[i], desc);
> +      ink_cond_init(&preproc_cond[i]);
> +      Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
> +      sprintf(desc, "[LOG_PREPROC %d]", i);
> +      eventProcessor.spawn_thread(preproc_cont, desc, stacksize);

spawn_thread() as a default.. i.e.:

eventProcessor::spawn_thread(Continuation, const char * desc, int stacksize -1)
{
  size_t thread_stacksize;
  if stacksize -1 {
    REC_ReadConfigInteger(thread_stacksize, "proxy.config.thread.default.stacksize")
  } else {
     thread_stacksize = stacksize
  }

}




>  /*-------------------------------------------------------------------------
> +<<<<<<< HEAD
>    LogFile::write_and_try_delete
> +=======
> +  LogFile::preproc_and_try_delete
> +
> +  preprocess the given buffer data before write to target file
> +  and try to delete it when its reference become zero.
> +>>>>>>> TS-2089: introduce configurable collation preproc threads


To reiterate a point Leif made recently about one of *my* commits:
We should make sure that every single commit actually compiles..

Anyway, that's all I have for now.

-- i 
Igor Galić

Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE