You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@trafficserver.apache.org by Igor Galić <i....@brainsware.org> on 2013/08/16 12:11:49 UTC

Re: git commit: TS-2089: introduce configurable collation preproc threads


----- Original Message -----
> Updated Branches:
>   refs/heads/master e0ec5304d -> 8a1112881
> 
> 
> TS-2089: introduce configurable collation preproc threads
> 
> We found that CPU of logging thread could be easy to reach up 100% in
> collation host, but disk IO was low at the same time.
> 
> The bottleneck of logging thread is that some preprocessing job, such as
> convert LogBuffer to ascii text, consume so much CPU time. And more
> worse, the write() operation will block logging thread.
> 
> So this patch try to split logging thread into two parts:
> 1) Configurable preproc threads, which are responsiable for processing all
>    of prepare work, and then forward the preprocessed buffer to flush thread,
>    or send them to CollationClient/HostSM.
> 
> 2) One Flush thread, it will consume preprocessed buffers and write them to
>    disk. In our testing, one flush thread is enough for us.
> 
> TODO: This patch supports only one flush thread, we can improve it to
>       "one flush thread per file/disk" in the future.
> 
> == How to configure ==
> The number of preproc threads is 1 by default.
> 
> Please modify "proxy.config.log.collation_preproc_threads" option to
> change it.


First off: I *love* your commit messages.

> Signed-off-by: Yunkai Zhang <qi...@taobao.com>
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288
> 
> Branch: refs/heads/master
> Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
> Parents: e0ec530
> Author: Yunkai Zhang <qi...@taobao.com>
> Authored: Sun Aug 11 16:42:32 2013 +0800
> Committer: Yunkai Zhang <qi...@taobao.com>
> Committed: Fri Aug 16 10:41:16 2013 +0800
> 
> ----------------------------------------------------------------------
>  CHANGES                               |   3 +
>  mgmt/RecordsConfig.cc                 |   2 +
>  mgmt/cli/ShowCmd.cc                   |   3 +
>  proxy/logging/Log.cc                  | 257 ++++++++++++++++++++++++-----
>  proxy/logging/Log.h                   |  46 +++++-
>  proxy/logging/LogBufferSink.h         |   9 +-
>  proxy/logging/LogCollationClientSM.cc |   6 +-
>  proxy/logging/LogCollationHostSM.cc   |   4 +-
>  proxy/logging/LogConfig.cc            |  19 ++-
>  proxy/logging/LogConfig.h             |   1 +
>  proxy/logging/LogFile.cc              | 121 ++++++--------
>  proxy/logging/LogFile.h               |  20 +--
>  proxy/logging/LogHost.cc              |  66 ++++----
>  proxy/logging/LogHost.h               |  14 +-
>  proxy/logging/LogObject.cc            |  68 ++++----
>  proxy/logging/LogObject.h             |  43 +++--
>  16 files changed, 450 insertions(+), 232 deletions(-)
> ----------------------------------------------------------------------

I'm missing a change to doc/reference/configuration/records.config.en.rst
We should make it a habit of adding documentation in the same commit as
new records.config changes.

> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
> ----------------------------------------------------------------------
> diff --git a/CHANGES b/CHANGES
> index 6b9a3bb..c613f00 100644
> --- a/CHANGES
> +++ b/CHANGES
> @@ -1,6 +1,9 @@
>                                                           -*- coding: utf-8
>                                                           -*-
>  Changes with Apache Traffic Server 3.5.0
>  
> +
> +  *) TS-2089: introduce configurable collation preproc threads
> +
>    *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
>     needlessly chowned to to ATS' user.
>     Author: Tomasz Kuzemko <to...@kuzemko.net>
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> index cfc2267..9402581 100644
> --- a/mgmt/RecordsConfig.cc
> +++ b/mgmt/RecordsConfig.cc
> @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
>    ,
>    {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT,
>    "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>    ,
> +  {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT, "1",
> RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
> +  ,
>    {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1",
>    RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
>    ,
>    {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT, "86400",
>    RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
> index 0798c43..ed71560 100644
> --- a/mgmt/cli/ShowCmd.cc
> +++ b/mgmt/cli/ShowCmd.cc
> @@ -1555,6 +1555,7 @@ ShowLogging()
>    TSInt collation_port = -1;
>    TSString collation_secret = NULL;
>    TSInt host_tag = 0;
> +  TSInt preproc_threads = 0;
>    TSInt orphan_space = -1;
>  
>    TSInt squid_log = 0;
> @@ -1596,6 +1597,7 @@ ShowLogging()
>    Cli_RecordGetString("proxy.config.log.collation_secret",
>    &collation_secret);
>    Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
>    Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs",
>    &orphan_space);
> +  Cli_RecordGetInt("proxy.config.log.collation_preproc_threads",
> &preproc_threads);
>  
>    Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
>    Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
> @@ -1657,6 +1659,7 @@ ShowLogging()
>    Cli_Printf("  Port ----------------------------------- %d\n",
>    collation_port);
>    Cli_Printf("  Secret --------------------------------- %s\n",
>    collation_secret);
>    Cli_PrintEnable("  Host Tagged ---------------------------- ", host_tag);
> +  Cli_PrintEnable("  Preproc Threads ------------------------ ",
> preproc_threads);
>    Cli_Printf("  Space Limit for Orphan Files ----------- %d MB\n",
>    orphan_space);
>  
>    Cli_PrintEnable("\nSquid Format ----------------------------- ",
>    squid_log);
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> index 94d5625..1361978 100644
> --- a/proxy/logging/Log.cc
> +++ b/proxy/logging/Log.cc
> @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
>  size_t Log::maxInactiveObjects;
>  
>  // Flush thread stuff
> -volatile unsigned long Log::flush_counter = 0;

This bit is fascinating: We're replacing the (seemingly)
unused volatile variable flush_counter with the already
existing variable m_bytes_written..

We turn that variable into a volatile, and continue to
use it as if nothing happened. But here's the question:
why was flush_counter not used?

> -ink_mutex Log::flush_mutex;
> -ink_cond Log::flush_cond;
> -ink_thread Log::flush_thread;
> +ink_mutex *Log::preproc_mutex;
> +ink_cond *Log::preproc_cond;
> +ink_mutex *Log::flush_mutex;
> +ink_cond *Log::flush_cond;
> +InkAtomicList *Log::flush_data_list;
>  
>  // Collate thread stuff
>  ink_mutex Log::collate_mutex;
>  ink_cond Log::collate_cond;
>  ink_thread Log::collate_thread;
>  int Log::collation_accept_file_descriptor;
> +int Log::collation_preproc_threads;
>  int Log::collation_port;
>  
>  // Log private objects
> @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
>  
>  struct PeriodicWakeup;
>  typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
> -struct PeriodicWakeup : Continuation {
> +struct PeriodicWakeup : Continuation
> +{
> +  int m_preproc_threads;
> +  int m_flush_threads;
> +
>    int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
>    {
> -    ink_cond_signal (&Log::flush_cond);
> -    return EVENT_CONT;
> +      for (int i = 0; i < m_preproc_threads; i++) {
> +        ink_cond_signal (&Log::preproc_cond[i]);
> +      }
> +      for (int i = 0; i < m_flush_threads; i++) {
> +        ink_cond_signal (&Log::flush_cond[i]);
> +      }
> +      return EVENT_CONT;
>    }
>  
> -  PeriodicWakeup () : Continuation (new_ProxyMutex())
> +  PeriodicWakeup (int preproc_threads, int flush_threads) :
> +    Continuation (new_ProxyMutex()),
> +    m_preproc_threads(preproc_threads),
> +    m_flush_threads(flush_threads)
>    {
> -    SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> +      SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
>    }
>  };
>  
> @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
>  /*-------------------------------------------------------------------------
>    MAIN INTERFACE
>    -------------------------------------------------------------------------*/
> +struct LoggingPreprocContinuation: public Continuation
> +{
> +  int m_idx;
> +
> +  int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
> +  {
> +    Log::preproc_thread_main((void *)&m_idx);
> +    return 0;
> +  }
> +
> +  LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
> +  {
> +    SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
> +  }
> +};
> +
>  struct LoggingFlushContinuation: public Continuation
>  {
> +  int m_idx;
> +
>    int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
>    {
> -    Log::flush_thread_main(NULL);
> +    Log::flush_thread_main((void *)&m_idx);
>      return 0;
>    }
>  
> -  LoggingFlushContinuation():Continuation(NULL)
> +  LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
>    {
>      SET_HANDLER(&LoggingFlushContinuation::mainEvent);
>    }
> @@ -910,6 +942,7 @@ Log::init(int flags)
>    numInactiveObjects = 0;
>    inactive_objects = new LogObject*[maxInactiveObjects];
>  
> +  collation_preproc_threads = 1;
>    collation_accept_file_descriptor = NO_FD;
>  
>    // store the configuration flags
> @@ -931,6 +964,7 @@ Log::init(int flags)
>  
>      config->read_configuration_variables();
>      collation_port = config->collation_port;
> +    collation_preproc_threads = config->collation_preproc_threads;
>  
>      if (config_flags & STANDALONE_COLLATOR) {
>        logging_mode = LOG_TRANSACTIONS_ONLY;
> @@ -959,8 +993,8 @@ Log::init(int flags)
>      create_threads();
>  
>  #ifndef INK_SINGLE_THREADED
> -    eventProcessor.schedule_every(NEW (new PeriodicWakeup()), HRTIME_SECOND,
> -        ET_CALL);
> +    eventProcessor.schedule_every(NEW (new
> PeriodicWakeup(collation_preproc_threads, 1)),
> +                                  HRTIME_SECOND, ET_CALL);
>  #endif
>      init_status |= PERIODIC_WAKEUP_SCHEDULED;
>  
> @@ -1001,9 +1035,16 @@ Log::init_when_enabled()
>      // setup global scrap object
>      //
>      global_scrap_format = NEW(new LogFormat(TEXT_LOG));
> -    global_scrap_object = NEW(new LogObject(global_scrap_format,
> Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
> -                                            NULL,
> Log::config->rolling_enabled, Log::config->rolling_interval_sec,
> -                                            Log::config->rolling_offset_hr,
> Log::config->rolling_size_mb));
> +    global_scrap_object =
> +      NEW(new LogObject(global_scrap_format,
> +                        Log::config->logfile_dir,
> +                        "scrapfile.log",
> +                        BINARY_LOG, NULL,
> +                        Log::config->rolling_enabled,
> +                        Log::config->collation_preproc_threads,
> +                        Log::config->rolling_interval_sec,
> +                        Log::config->rolling_offset_hr,
> +                        Log::config->rolling_size_mb));
>  
>      // create the flush thread and the collation thread
>      //
> @@ -1030,15 +1071,43 @@ Log::create_threads()
>  
>    REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
>    if (!(init_status & THREADS_CREATED)) {
> -    // start the flush thread
> +
> +    char desc[64];
> +    preproc_mutex = new ink_mutex[collation_preproc_threads];
> +    preproc_cond = new ink_cond[collation_preproc_threads];
> +
> +    size_t stacksize;
> +    REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");

This ^ seems like an unnecessary step, that could be pushed into ...

> +
> +    // start the preproc threads
>      //
>      // no need for the conditional var since it will be relying on
>      // on the event system.
> -    ink_mutex_init(&flush_mutex, "Flush thread mutex");
> -    ink_cond_init(&flush_cond);
> -    Continuation *flush_continuation = NEW(new LoggingFlushContinuation);
> -    Event *flush_event = eventProcessor.spawn_thread(flush_continuation,
> "[LOGGING]", stacksize);
> -    flush_thread = flush_event->ethread->tid;
> +    for (int i = 0; i < collation_preproc_threads; i++) {
> +      sprintf(desc, "Logging preproc thread mutex[%d]", i);
> +      ink_mutex_init(&preproc_mutex[i], desc);
> +      ink_cond_init(&preproc_cond[i]);
> +      Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
> +      sprintf(desc, "[LOG_PREPROC %d]", i);
> +      eventProcessor.spawn_thread(preproc_cont, desc, stacksize);

spawn_thread() as a default.. i.e.:

eventProcessor::spawn_thread(Continuation, const char * desc, int stacksize -1)
{
  size_t thread_stacksize;
  if stacksize -1 {
    REC_ReadConfigInteger(thread_stacksize, "proxy.config.thread.default.stacksize")
  } else {
     thread_stacksize = stacksize
  }

}




>  /*-------------------------------------------------------------------------
> +<<<<<<< HEAD
>    LogFile::write_and_try_delete
> +=======
> +  LogFile::preproc_and_try_delete
> +
> +  preprocess the given buffer data before write to target file
> +  and try to delete it when its reference become zero.
> +>>>>>>> TS-2089: introduce configurable collation preproc threads


To reiterate a point Leif made recently about one of *my* commits:
We should make sure that every single commit actually compiles..

Anyway, that's all I have for now.

-- i 
Igor Galić

Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE

Re: git commit: TS-2089: introduce configurable collation preproc threads

Posted by Igor Galić <i....@brainsware.org>.
> 
> >  /*-------------------------------------------------------------------------
> > +<<<<<<< HEAD
> >    LogFile::write_and_try_delete
> > +=======
> > +  LogFile::preproc_and_try_delete
> > +
> > +  preprocess the given buffer data before write to target file
> > +  and try to delete it when its reference become zero.
> > +>>>>>>> TS-2089: introduce configurable collation preproc threads
> 
> 
> To reiterate a point Leif made recently about one of *my* commits:
> We should make sure that every single commit actually compiles..
> 

Looking at this a second time, I realize this was a in a comment,
so I feel a little foolish now, but we still might want to fix that. 

-- 
Igor Galić

Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE


Re: git commit: TS-2089: introduce configurable collation preproc threads

Posted by Yunkai Zhang <yu...@gmail.com>.
On Fri, Aug 16, 2013 at 6:11 PM, Igor Galić <i....@brainsware.org> wrote:

>
>
> ----- Original Message -----
> > Updated Branches:
> >   refs/heads/master e0ec5304d -> 8a1112881
> >
> >
> > TS-2089: introduce configurable collation preproc threads
> >
> > We found that CPU of logging thread could be easy to reach up 100% in
> > collation host, but disk IO was low at the same time.
> >
> > The bottleneck of logging thread is that some preprocessing job, such as
> > convert LogBuffer to ascii text, consume so much CPU time. And more
> > worse, the write() operation will block logging thread.
> >
> > So this patch try to split logging thread into two parts:
> > 1) Configurable preproc threads, which are responsiable for processing
> all
> >    of prepare work, and then forward the preprocessed buffer to flush
> thread,
> >    or send them to CollationClient/HostSM.
> >
> > 2) One Flush thread, it will consume preprocessed buffers and write them
> to
> >    disk. In our testing, one flush thread is enough for us.
> >
> > TODO: This patch supports only one flush thread, we can improve it to
> >       "one flush thread per file/disk" in the future.
> >
> > == How to configure ==
> > The number of preproc threads is 1 by default.
> >
> > Please modify "proxy.config.log.collation_preproc_threads" option to
> > change it.
>
>
> First off: I *love* your commit messages.
>
> > Signed-off-by: Yunkai Zhang <qi...@taobao.com>
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
> > Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
> > Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288
> >
> > Branch: refs/heads/master
> > Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
> > Parents: e0ec530
> > Author: Yunkai Zhang <qi...@taobao.com>
> > Authored: Sun Aug 11 16:42:32 2013 +0800
> > Committer: Yunkai Zhang <qi...@taobao.com>
> > Committed: Fri Aug 16 10:41:16 2013 +0800
> >
> > ----------------------------------------------------------------------
> >  CHANGES                               |   3 +
> >  mgmt/RecordsConfig.cc                 |   2 +
> >  mgmt/cli/ShowCmd.cc                   |   3 +
> >  proxy/logging/Log.cc                  | 257
> ++++++++++++++++++++++++-----
> >  proxy/logging/Log.h                   |  46 +++++-
> >  proxy/logging/LogBufferSink.h         |   9 +-
> >  proxy/logging/LogCollationClientSM.cc |   6 +-
> >  proxy/logging/LogCollationHostSM.cc   |   4 +-
> >  proxy/logging/LogConfig.cc            |  19 ++-
> >  proxy/logging/LogConfig.h             |   1 +
> >  proxy/logging/LogFile.cc              | 121 ++++++--------
> >  proxy/logging/LogFile.h               |  20 +--
> >  proxy/logging/LogHost.cc              |  66 ++++----
> >  proxy/logging/LogHost.h               |  14 +-
> >  proxy/logging/LogObject.cc            |  68 ++++----
> >  proxy/logging/LogObject.h             |  43 +++--
> >  16 files changed, 450 insertions(+), 232 deletions(-)
> > ----------------------------------------------------------------------
>
> I'm missing a change to doc/reference/configuration/records.config.en.rst
> We should make it a habit of adding documentation in the same commit as
> new records.config changes.
>

I didn't notice "records.config.en.rst" file, sorry.

I'll sync it later.



>
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
> > ----------------------------------------------------------------------
> > diff --git a/CHANGES b/CHANGES
> > index 6b9a3bb..c613f00 100644
> > --- a/CHANGES
> > +++ b/CHANGES
> > @@ -1,6 +1,9 @@
> >                                                           -*- coding:
> utf-8
> >                                                           -*-
> >  Changes with Apache Traffic Server 3.5.0
> >
> > +
> > +  *) TS-2089: introduce configurable collation preproc threads
> > +
> >    *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
> >     needlessly chowned to to ATS' user.
> >     Author: Tomasz Kuzemko <to...@kuzemko.net>
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
> > ----------------------------------------------------------------------
> > diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> > index cfc2267..9402581 100644
> > --- a/mgmt/RecordsConfig.cc
> > +++ b/mgmt/RecordsConfig.cc
> > @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
> >    ,
> >    {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT,
> >    "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
> >    ,
> > +  {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT,
> "1",
> > RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
> > +  ,
> >    {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1",
> >    RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
> >    ,
> >    {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT,
> "86400",
> >    RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
> > ----------------------------------------------------------------------
> > diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
> > index 0798c43..ed71560 100644
> > --- a/mgmt/cli/ShowCmd.cc
> > +++ b/mgmt/cli/ShowCmd.cc
> > @@ -1555,6 +1555,7 @@ ShowLogging()
> >    TSInt collation_port = -1;
> >    TSString collation_secret = NULL;
> >    TSInt host_tag = 0;
> > +  TSInt preproc_threads = 0;
> >    TSInt orphan_space = -1;
> >
> >    TSInt squid_log = 0;
> > @@ -1596,6 +1597,7 @@ ShowLogging()
> >    Cli_RecordGetString("proxy.config.log.collation_secret",
> >    &collation_secret);
> >    Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
> >    Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs",
> >    &orphan_space);
> > +  Cli_RecordGetInt("proxy.config.log.collation_preproc_threads",
> > &preproc_threads);
> >
> >    Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
> >    Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
> > @@ -1657,6 +1659,7 @@ ShowLogging()
> >    Cli_Printf("  Port ----------------------------------- %d\n",
> >    collation_port);
> >    Cli_Printf("  Secret --------------------------------- %s\n",
> >    collation_secret);
> >    Cli_PrintEnable("  Host Tagged ---------------------------- ",
> host_tag);
> > +  Cli_PrintEnable("  Preproc Threads ------------------------ ",
> > preproc_threads);
> >    Cli_Printf("  Space Limit for Orphan Files ----------- %d MB\n",
> >    orphan_space);
> >
> >    Cli_PrintEnable("\nSquid Format ----------------------------- ",
> >    squid_log);
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> > index 94d5625..1361978 100644
> > --- a/proxy/logging/Log.cc
> > +++ b/proxy/logging/Log.cc
> > @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
> >  size_t Log::maxInactiveObjects;
> >
> >  // Flush thread stuff
> > -volatile unsigned long Log::flush_counter = 0;
>
> This bit is fascinating: We're replacing the (seemingly)
> unused volatile variable flush_counter with the already
> existing variable m_bytes_written..
>
> We turn that variable into a volatile, and continue to
> use it as if nothing happened. But here's the question:
> why was flush_counter not used?
>

Log::flush_counter is unused in the original code. I don't know why it
exists. It seems a deprecated code.

Why change LogFile.m_bytes_written to volatile is that now we have multiple
preproc threads and one flush thread, those threads may change
m_bytes_written concurrently.


> > -ink_mutex Log::flush_mutex;
> > -ink_cond Log::flush_cond;
> > -ink_thread Log::flush_thread;
> > +ink_mutex *Log::preproc_mutex;
> > +ink_cond *Log::preproc_cond;
> > +ink_mutex *Log::flush_mutex;
> > +ink_cond *Log::flush_cond;
> > +InkAtomicList *Log::flush_data_list;
> >
> >  // Collate thread stuff
> >  ink_mutex Log::collate_mutex;
> >  ink_cond Log::collate_cond;
> >  ink_thread Log::collate_thread;
> >  int Log::collation_accept_file_descriptor;
> > +int Log::collation_preproc_threads;
> >  int Log::collation_port;
> >
> >  // Log private objects
> > @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
> >
> >  struct PeriodicWakeup;
> >  typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
> > -struct PeriodicWakeup : Continuation {
> > +struct PeriodicWakeup : Continuation
> > +{
> > +  int m_preproc_threads;
> > +  int m_flush_threads;
> > +
> >    int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
> >    {
> > -    ink_cond_signal (&Log::flush_cond);
> > -    return EVENT_CONT;
> > +      for (int i = 0; i < m_preproc_threads; i++) {
> > +        ink_cond_signal (&Log::preproc_cond[i]);
> > +      }
> > +      for (int i = 0; i < m_flush_threads; i++) {
> > +        ink_cond_signal (&Log::flush_cond[i]);
> > +      }
> > +      return EVENT_CONT;
> >    }
> >
> > -  PeriodicWakeup () : Continuation (new_ProxyMutex())
> > +  PeriodicWakeup (int preproc_threads, int flush_threads) :
> > +    Continuation (new_ProxyMutex()),
> > +    m_preproc_threads(preproc_threads),
> > +    m_flush_threads(flush_threads)
> >    {
> > -    SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> > +      SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> >    }
> >  };
> >
> > @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
> >
>  /*-------------------------------------------------------------------------
> >    MAIN INTERFACE
> >
>  -------------------------------------------------------------------------*/
> > +struct LoggingPreprocContinuation: public Continuation
> > +{
> > +  int m_idx;
> > +
> > +  int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED
> */)
> > +  {
> > +    Log::preproc_thread_main((void *)&m_idx);
> > +    return 0;
> > +  }
> > +
> > +  LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
> > +  {
> > +    SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
> > +  }
> > +};
> > +
> >  struct LoggingFlushContinuation: public Continuation
> >  {
> > +  int m_idx;
> > +
> >    int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED
> */)
> >    {
> > -    Log::flush_thread_main(NULL);
> > +    Log::flush_thread_main((void *)&m_idx);
> >      return 0;
> >    }
> >
> > -  LoggingFlushContinuation():Continuation(NULL)
> > +  LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
> >    {
> >      SET_HANDLER(&LoggingFlushContinuation::mainEvent);
> >    }
> > @@ -910,6 +942,7 @@ Log::init(int flags)
> >    numInactiveObjects = 0;
> >    inactive_objects = new LogObject*[maxInactiveObjects];
> >
> > +  collation_preproc_threads = 1;
> >    collation_accept_file_descriptor = NO_FD;
> >
> >    // store the configuration flags
> > @@ -931,6 +964,7 @@ Log::init(int flags)
> >
> >      config->read_configuration_variables();
> >      collation_port = config->collation_port;
> > +    collation_preproc_threads = config->collation_preproc_threads;
> >
> >      if (config_flags & STANDALONE_COLLATOR) {
> >        logging_mode = LOG_TRANSACTIONS_ONLY;
> > @@ -959,8 +993,8 @@ Log::init(int flags)
> >      create_threads();
> >
> >  #ifndef INK_SINGLE_THREADED
> > -    eventProcessor.schedule_every(NEW (new PeriodicWakeup()),
> HRTIME_SECOND,
> > -        ET_CALL);
> > +    eventProcessor.schedule_every(NEW (new
> > PeriodicWakeup(collation_preproc_threads, 1)),
> > +                                  HRTIME_SECOND, ET_CALL);
> >  #endif
> >      init_status |= PERIODIC_WAKEUP_SCHEDULED;
> >
> > @@ -1001,9 +1035,16 @@ Log::init_when_enabled()
> >      // setup global scrap object
> >      //
> >      global_scrap_format = NEW(new LogFormat(TEXT_LOG));
> > -    global_scrap_object = NEW(new LogObject(global_scrap_format,
> > Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
> > -                                            NULL,
> > Log::config->rolling_enabled, Log::config->rolling_interval_sec,
> > -
>  Log::config->rolling_offset_hr,
> > Log::config->rolling_size_mb));
> > +    global_scrap_object =
> > +      NEW(new LogObject(global_scrap_format,
> > +                        Log::config->logfile_dir,
> > +                        "scrapfile.log",
> > +                        BINARY_LOG, NULL,
> > +                        Log::config->rolling_enabled,
> > +                        Log::config->collation_preproc_threads,
> > +                        Log::config->rolling_interval_sec,
> > +                        Log::config->rolling_offset_hr,
> > +                        Log::config->rolling_size_mb));
> >
> >      // create the flush thread and the collation thread
> >      //
> > @@ -1030,15 +1071,43 @@ Log::create_threads()
> >
> >    REC_ReadConfigInteger(stacksize,
> "proxy.config.thread.default.stacksize");
> >    if (!(init_status & THREADS_CREATED)) {
> > -    // start the flush thread
> > +
> > +    char desc[64];
> > +    preproc_mutex = new ink_mutex[collation_preproc_threads];
> > +    preproc_cond = new ink_cond[collation_preproc_threads];
> > +
> > +    size_t stacksize;
> > +    REC_ReadConfigInteger(stacksize,
> "proxy.config.thread.default.stacksize");
>
> This ^ seems like an unnecessary step, that could be pushed into ...
>
> > +
> > +    // start the preproc threads
> >      //
> >      // no need for the conditional var since it will be relying on
> >      // on the event system.
> > -    ink_mutex_init(&flush_mutex, "Flush thread mutex");
> > -    ink_cond_init(&flush_cond);
> > -    Continuation *flush_continuation = NEW(new
> LoggingFlushContinuation);
> > -    Event *flush_event = eventProcessor.spawn_thread(flush_continuation,
> > "[LOGGING]", stacksize);
> > -    flush_thread = flush_event->ethread->tid;
> > +    for (int i = 0; i < collation_preproc_threads; i++) {
> > +      sprintf(desc, "Logging preproc thread mutex[%d]", i);
> > +      ink_mutex_init(&preproc_mutex[i], desc);
> > +      ink_cond_init(&preproc_cond[i]);
> > +      Continuation *preproc_cont = NEW(new
> LoggingPreprocContinuation(i));
> > +      sprintf(desc, "[LOG_PREPROC %d]", i);
> > +      eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
>
> spawn_thread() as a default.. i.e.:
>
> eventProcessor::spawn_thread(Continuation, const char * desc, int
> stacksize -1)
> {
>   size_t thread_stacksize;
>   if stacksize -1 {
>     REC_ReadConfigInteger(thread_stacksize,
> "proxy.config.thread.default.stacksize")
>   } else {
>      thread_stacksize = stacksize
>   }
>
> }
>

Yes, agree. I think it's better to give another patch to improve
spawn_thread().


>
>
>
>
> >
>  /*-------------------------------------------------------------------------
> > +<<<<<<< HEAD
> >    LogFile::write_and_try_delete
> > +=======
> > +  LogFile::preproc_and_try_delete
> > +
> > +  preprocess the given buffer data before write to target file
> > +  and try to delete it when its reference become zero.
> > +>>>>>>> TS-2089: introduce configurable collation preproc threads
>
>
> To reiterate a point Leif made recently about one of *my* commits:
> We should make sure that every single commit actually compiles..
>

Yes, it's comments:D

Thanks for your reviewing.


>
> Anyway, that's all I have for now.


> -- i
> Igor Galić
>
> Tel: +43 (0) 664 886 22 883
> Mail: i.galic@brainsware.org
> URL: http://brainsware.org/
> GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE
>



-- 
Yunkai Zhang
Work at Taobao