You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@trafficserver.apache.org by Igor Galić <i....@brainsware.org> on 2013/08/16 12:11:49 UTC
Re: git commit: TS-2089: introduce configurable collation preproc
threads
----- Original Message -----
> Updated Branches:
> refs/heads/master e0ec5304d -> 8a1112881
>
>
> TS-2089: introduce configurable collation preproc threads
>
> We found that CPU of logging thread could be easy to reach up 100% in
> collation host, but disk IO was low at the same time.
>
> The bottleneck of logging thread is that some preprocessing job, such as
> convert LogBuffer to ascii text, consume so much CPU time. And more
> worse, the write() operation will block logging thread.
>
> So this patch try to split logging thread into two parts:
> 1) Configurable preproc threads, which are responsiable for processing all
> of prepare work, and then forward the preprocessed buffer to flush thread,
> or send them to CollationClient/HostSM.
>
> 2) One Flush thread, it will consume preprocessed buffers and write them to
> disk. In our testing, one flush thread is enough for us.
>
> TODO: This patch supports only one flush thread, we can improve it to
> "one flush thread per file/disk" in the future.
>
> == How to configure ==
> The number of preproc threads is 1 by default.
>
> Please modify "proxy.config.log.collation_preproc_threads" option to
> change it.
First off: I *love* your commit messages.
> Signed-off-by: Yunkai Zhang <qi...@taobao.com>
>
>
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288
>
> Branch: refs/heads/master
> Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
> Parents: e0ec530
> Author: Yunkai Zhang <qi...@taobao.com>
> Authored: Sun Aug 11 16:42:32 2013 +0800
> Committer: Yunkai Zhang <qi...@taobao.com>
> Committed: Fri Aug 16 10:41:16 2013 +0800
>
> ----------------------------------------------------------------------
> CHANGES | 3 +
> mgmt/RecordsConfig.cc | 2 +
> mgmt/cli/ShowCmd.cc | 3 +
> proxy/logging/Log.cc | 257 ++++++++++++++++++++++++-----
> proxy/logging/Log.h | 46 +++++-
> proxy/logging/LogBufferSink.h | 9 +-
> proxy/logging/LogCollationClientSM.cc | 6 +-
> proxy/logging/LogCollationHostSM.cc | 4 +-
> proxy/logging/LogConfig.cc | 19 ++-
> proxy/logging/LogConfig.h | 1 +
> proxy/logging/LogFile.cc | 121 ++++++--------
> proxy/logging/LogFile.h | 20 +--
> proxy/logging/LogHost.cc | 66 ++++----
> proxy/logging/LogHost.h | 14 +-
> proxy/logging/LogObject.cc | 68 ++++----
> proxy/logging/LogObject.h | 43 +++--
> 16 files changed, 450 insertions(+), 232 deletions(-)
> ----------------------------------------------------------------------
I'm missing a change to doc/reference/configuration/records.config.en.rst
We should make it a habit of adding documentation in the same commit as
new records.config changes.
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
> ----------------------------------------------------------------------
> diff --git a/CHANGES b/CHANGES
> index 6b9a3bb..c613f00 100644
> --- a/CHANGES
> +++ b/CHANGES
> @@ -1,6 +1,9 @@
> -*- coding: utf-8
> -*-
> Changes with Apache Traffic Server 3.5.0
>
> +
> + *) TS-2089: introduce configurable collation preproc threads
> +
> *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
> needlessly chowned to to ATS' user.
> Author: Tomasz Kuzemko <to...@kuzemko.net>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> index cfc2267..9402581 100644
> --- a/mgmt/RecordsConfig.cc
> +++ b/mgmt/RecordsConfig.cc
> @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
> ,
> {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT,
> "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
> ,
> + {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT, "1",
> RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
> + ,
> {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1",
> RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
> ,
> {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT, "86400",
> RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
> index 0798c43..ed71560 100644
> --- a/mgmt/cli/ShowCmd.cc
> +++ b/mgmt/cli/ShowCmd.cc
> @@ -1555,6 +1555,7 @@ ShowLogging()
> TSInt collation_port = -1;
> TSString collation_secret = NULL;
> TSInt host_tag = 0;
> + TSInt preproc_threads = 0;
> TSInt orphan_space = -1;
>
> TSInt squid_log = 0;
> @@ -1596,6 +1597,7 @@ ShowLogging()
> Cli_RecordGetString("proxy.config.log.collation_secret",
> &collation_secret);
> Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
> Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs",
> &orphan_space);
> + Cli_RecordGetInt("proxy.config.log.collation_preproc_threads",
> &preproc_threads);
>
> Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
> Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
> @@ -1657,6 +1659,7 @@ ShowLogging()
> Cli_Printf(" Port ----------------------------------- %d\n",
> collation_port);
> Cli_Printf(" Secret --------------------------------- %s\n",
> collation_secret);
> Cli_PrintEnable(" Host Tagged ---------------------------- ", host_tag);
> + Cli_PrintEnable(" Preproc Threads ------------------------ ",
> preproc_threads);
> Cli_Printf(" Space Limit for Orphan Files ----------- %d MB\n",
> orphan_space);
>
> Cli_PrintEnable("\nSquid Format ----------------------------- ",
> squid_log);
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> index 94d5625..1361978 100644
> --- a/proxy/logging/Log.cc
> +++ b/proxy/logging/Log.cc
> @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
> size_t Log::maxInactiveObjects;
>
> // Flush thread stuff
> -volatile unsigned long Log::flush_counter = 0;
This bit is fascinating: We're replacing the (seemingly)
unused volatile variable flush_counter with the already
existing variable m_bytes_written..
We turn that variable into a volatile, and continue to
use it as if nothing happened. But here's the question:
why was flush_counter not used?
> -ink_mutex Log::flush_mutex;
> -ink_cond Log::flush_cond;
> -ink_thread Log::flush_thread;
> +ink_mutex *Log::preproc_mutex;
> +ink_cond *Log::preproc_cond;
> +ink_mutex *Log::flush_mutex;
> +ink_cond *Log::flush_cond;
> +InkAtomicList *Log::flush_data_list;
>
> // Collate thread stuff
> ink_mutex Log::collate_mutex;
> ink_cond Log::collate_cond;
> ink_thread Log::collate_thread;
> int Log::collation_accept_file_descriptor;
> +int Log::collation_preproc_threads;
> int Log::collation_port;
>
> // Log private objects
> @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
>
> struct PeriodicWakeup;
> typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
> -struct PeriodicWakeup : Continuation {
> +struct PeriodicWakeup : Continuation
> +{
> + int m_preproc_threads;
> + int m_flush_threads;
> +
> int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
> {
> - ink_cond_signal (&Log::flush_cond);
> - return EVENT_CONT;
> + for (int i = 0; i < m_preproc_threads; i++) {
> + ink_cond_signal (&Log::preproc_cond[i]);
> + }
> + for (int i = 0; i < m_flush_threads; i++) {
> + ink_cond_signal (&Log::flush_cond[i]);
> + }
> + return EVENT_CONT;
> }
>
> - PeriodicWakeup () : Continuation (new_ProxyMutex())
> + PeriodicWakeup (int preproc_threads, int flush_threads) :
> + Continuation (new_ProxyMutex()),
> + m_preproc_threads(preproc_threads),
> + m_flush_threads(flush_threads)
> {
> - SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> + SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> }
> };
>
> @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
> /*-------------------------------------------------------------------------
> MAIN INTERFACE
> -------------------------------------------------------------------------*/
> +struct LoggingPreprocContinuation: public Continuation
> +{
> + int m_idx;
> +
> + int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
> + {
> + Log::preproc_thread_main((void *)&m_idx);
> + return 0;
> + }
> +
> + LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
> + {
> + SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
> + }
> +};
> +
> struct LoggingFlushContinuation: public Continuation
> {
> + int m_idx;
> +
> int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
> {
> - Log::flush_thread_main(NULL);
> + Log::flush_thread_main((void *)&m_idx);
> return 0;
> }
>
> - LoggingFlushContinuation():Continuation(NULL)
> + LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
> {
> SET_HANDLER(&LoggingFlushContinuation::mainEvent);
> }
> @@ -910,6 +942,7 @@ Log::init(int flags)
> numInactiveObjects = 0;
> inactive_objects = new LogObject*[maxInactiveObjects];
>
> + collation_preproc_threads = 1;
> collation_accept_file_descriptor = NO_FD;
>
> // store the configuration flags
> @@ -931,6 +964,7 @@ Log::init(int flags)
>
> config->read_configuration_variables();
> collation_port = config->collation_port;
> + collation_preproc_threads = config->collation_preproc_threads;
>
> if (config_flags & STANDALONE_COLLATOR) {
> logging_mode = LOG_TRANSACTIONS_ONLY;
> @@ -959,8 +993,8 @@ Log::init(int flags)
> create_threads();
>
> #ifndef INK_SINGLE_THREADED
> - eventProcessor.schedule_every(NEW (new PeriodicWakeup()), HRTIME_SECOND,
> - ET_CALL);
> + eventProcessor.schedule_every(NEW (new
> PeriodicWakeup(collation_preproc_threads, 1)),
> + HRTIME_SECOND, ET_CALL);
> #endif
> init_status |= PERIODIC_WAKEUP_SCHEDULED;
>
> @@ -1001,9 +1035,16 @@ Log::init_when_enabled()
> // setup global scrap object
> //
> global_scrap_format = NEW(new LogFormat(TEXT_LOG));
> - global_scrap_object = NEW(new LogObject(global_scrap_format,
> Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
> - NULL,
> Log::config->rolling_enabled, Log::config->rolling_interval_sec,
> - Log::config->rolling_offset_hr,
> Log::config->rolling_size_mb));
> + global_scrap_object =
> + NEW(new LogObject(global_scrap_format,
> + Log::config->logfile_dir,
> + "scrapfile.log",
> + BINARY_LOG, NULL,
> + Log::config->rolling_enabled,
> + Log::config->collation_preproc_threads,
> + Log::config->rolling_interval_sec,
> + Log::config->rolling_offset_hr,
> + Log::config->rolling_size_mb));
>
> // create the flush thread and the collation thread
> //
> @@ -1030,15 +1071,43 @@ Log::create_threads()
>
> REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
> if (!(init_status & THREADS_CREATED)) {
> - // start the flush thread
> +
> + char desc[64];
> + preproc_mutex = new ink_mutex[collation_preproc_threads];
> + preproc_cond = new ink_cond[collation_preproc_threads];
> +
> + size_t stacksize;
> + REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
This ^ seems like an unnecessary step, that could be pushed into ...
> +
> + // start the preproc threads
> //
> // no need for the conditional var since it will be relying on
> // on the event system.
> - ink_mutex_init(&flush_mutex, "Flush thread mutex");
> - ink_cond_init(&flush_cond);
> - Continuation *flush_continuation = NEW(new LoggingFlushContinuation);
> - Event *flush_event = eventProcessor.spawn_thread(flush_continuation,
> "[LOGGING]", stacksize);
> - flush_thread = flush_event->ethread->tid;
> + for (int i = 0; i < collation_preproc_threads; i++) {
> + sprintf(desc, "Logging preproc thread mutex[%d]", i);
> + ink_mutex_init(&preproc_mutex[i], desc);
> + ink_cond_init(&preproc_cond[i]);
> + Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
> + sprintf(desc, "[LOG_PREPROC %d]", i);
> + eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
spawn_thread() as a default.. i.e.:
eventProcessor::spawn_thread(Continuation, const char * desc, int stacksize -1)
{
size_t thread_stacksize;
if stacksize -1 {
REC_ReadConfigInteger(thread_stacksize, "proxy.config.thread.default.stacksize")
} else {
thread_stacksize = stacksize
}
}
> /*-------------------------------------------------------------------------
> +<<<<<<< HEAD
> LogFile::write_and_try_delete
> +=======
> + LogFile::preproc_and_try_delete
> +
> + preprocess the given buffer data before write to target file
> + and try to delete it when its reference become zero.
> +>>>>>>> TS-2089: introduce configurable collation preproc threads
To reiterate a point Leif made recently about one of *my* commits:
We should make sure that every single commit actually compiles..
Anyway, that's all I have for now.
-- i
Igor Galić
Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE
Re: git commit: TS-2089: introduce configurable collation preproc
threads
Posted by Igor Galić <i....@brainsware.org>.
>
> > /*-------------------------------------------------------------------------
> > +<<<<<<< HEAD
> > LogFile::write_and_try_delete
> > +=======
> > + LogFile::preproc_and_try_delete
> > +
> > + preprocess the given buffer data before write to target file
> > + and try to delete it when its reference become zero.
> > +>>>>>>> TS-2089: introduce configurable collation preproc threads
>
>
> To reiterate a point Leif made recently about one of *my* commits:
> We should make sure that every single commit actually compiles..
>
Looking at this a second time, I realize this was a in a comment,
so I feel a little foolish now, but we still might want to fix that.
--
Igor Galić
Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE
Re: git commit: TS-2089: introduce configurable collation preproc threads
Posted by Yunkai Zhang <yu...@gmail.com>.
On Fri, Aug 16, 2013 at 6:11 PM, Igor Galić <i....@brainsware.org> wrote:
>
>
> ----- Original Message -----
> > Updated Branches:
> > refs/heads/master e0ec5304d -> 8a1112881
> >
> >
> > TS-2089: introduce configurable collation preproc threads
> >
> > We found that CPU of logging thread could be easy to reach up 100% in
> > collation host, but disk IO was low at the same time.
> >
> > The bottleneck of logging thread is that some preprocessing job, such as
> > convert LogBuffer to ascii text, consume so much CPU time. And more
> > worse, the write() operation will block logging thread.
> >
> > So this patch try to split logging thread into two parts:
> > 1) Configurable preproc threads, which are responsiable for processing
> all
> > of prepare work, and then forward the preprocessed buffer to flush
> thread,
> > or send them to CollationClient/HostSM.
> >
> > 2) One Flush thread, it will consume preprocessed buffers and write them
> to
> > disk. In our testing, one flush thread is enough for us.
> >
> > TODO: This patch supports only one flush thread, we can improve it to
> > "one flush thread per file/disk" in the future.
> >
> > == How to configure ==
> > The number of preproc threads is 1 by default.
> >
> > Please modify "proxy.config.log.collation_preproc_threads" option to
> > change it.
>
>
> First off: I *love* your commit messages.
>
> > Signed-off-by: Yunkai Zhang <qi...@taobao.com>
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
> > Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
> > Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288
> >
> > Branch: refs/heads/master
> > Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
> > Parents: e0ec530
> > Author: Yunkai Zhang <qi...@taobao.com>
> > Authored: Sun Aug 11 16:42:32 2013 +0800
> > Committer: Yunkai Zhang <qi...@taobao.com>
> > Committed: Fri Aug 16 10:41:16 2013 +0800
> >
> > ----------------------------------------------------------------------
> > CHANGES | 3 +
> > mgmt/RecordsConfig.cc | 2 +
> > mgmt/cli/ShowCmd.cc | 3 +
> > proxy/logging/Log.cc | 257
> ++++++++++++++++++++++++-----
> > proxy/logging/Log.h | 46 +++++-
> > proxy/logging/LogBufferSink.h | 9 +-
> > proxy/logging/LogCollationClientSM.cc | 6 +-
> > proxy/logging/LogCollationHostSM.cc | 4 +-
> > proxy/logging/LogConfig.cc | 19 ++-
> > proxy/logging/LogConfig.h | 1 +
> > proxy/logging/LogFile.cc | 121 ++++++--------
> > proxy/logging/LogFile.h | 20 +--
> > proxy/logging/LogHost.cc | 66 ++++----
> > proxy/logging/LogHost.h | 14 +-
> > proxy/logging/LogObject.cc | 68 ++++----
> > proxy/logging/LogObject.h | 43 +++--
> > 16 files changed, 450 insertions(+), 232 deletions(-)
> > ----------------------------------------------------------------------
>
> I'm missing a change to doc/reference/configuration/records.config.en.rst
> We should make it a habit of adding documentation in the same commit as
> new records.config changes.
>
I didn't notice "records.config.en.rst" file, sorry.
I'll sync it later.
>
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
> > ----------------------------------------------------------------------
> > diff --git a/CHANGES b/CHANGES
> > index 6b9a3bb..c613f00 100644
> > --- a/CHANGES
> > +++ b/CHANGES
> > @@ -1,6 +1,9 @@
> > -*- coding:
> utf-8
> > -*-
> > Changes with Apache Traffic Server 3.5.0
> >
> > +
> > + *) TS-2089: introduce configurable collation preproc threads
> > +
> > *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
> > needlessly chowned to to ATS' user.
> > Author: Tomasz Kuzemko <to...@kuzemko.net>
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
> > ----------------------------------------------------------------------
> > diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> > index cfc2267..9402581 100644
> > --- a/mgmt/RecordsConfig.cc
> > +++ b/mgmt/RecordsConfig.cc
> > @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
> > ,
> > {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT,
> > "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
> > ,
> > + {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT,
> "1",
> > RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
> > + ,
> > {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1",
> > RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
> > ,
> > {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT,
> "86400",
> > RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
> > ----------------------------------------------------------------------
> > diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
> > index 0798c43..ed71560 100644
> > --- a/mgmt/cli/ShowCmd.cc
> > +++ b/mgmt/cli/ShowCmd.cc
> > @@ -1555,6 +1555,7 @@ ShowLogging()
> > TSInt collation_port = -1;
> > TSString collation_secret = NULL;
> > TSInt host_tag = 0;
> > + TSInt preproc_threads = 0;
> > TSInt orphan_space = -1;
> >
> > TSInt squid_log = 0;
> > @@ -1596,6 +1597,7 @@ ShowLogging()
> > Cli_RecordGetString("proxy.config.log.collation_secret",
> > &collation_secret);
> > Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
> > Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs",
> > &orphan_space);
> > + Cli_RecordGetInt("proxy.config.log.collation_preproc_threads",
> > &preproc_threads);
> >
> > Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
> > Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
> > @@ -1657,6 +1659,7 @@ ShowLogging()
> > Cli_Printf(" Port ----------------------------------- %d\n",
> > collation_port);
> > Cli_Printf(" Secret --------------------------------- %s\n",
> > collation_secret);
> > Cli_PrintEnable(" Host Tagged ---------------------------- ",
> host_tag);
> > + Cli_PrintEnable(" Preproc Threads ------------------------ ",
> > preproc_threads);
> > Cli_Printf(" Space Limit for Orphan Files ----------- %d MB\n",
> > orphan_space);
> >
> > Cli_PrintEnable("\nSquid Format ----------------------------- ",
> > squid_log);
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> > index 94d5625..1361978 100644
> > --- a/proxy/logging/Log.cc
> > +++ b/proxy/logging/Log.cc
> > @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
> > size_t Log::maxInactiveObjects;
> >
> > // Flush thread stuff
> > -volatile unsigned long Log::flush_counter = 0;
>
> This bit is fascinating: We're replacing the (seemingly)
> unused volatile variable flush_counter with the already
> existing variable m_bytes_written..
>
> We turn that variable into a volatile, and continue to
> use it as if nothing happened. But here's the question:
> why was flush_counter not used?
>
Log::flush_counter is unused in the original code. I don't know why it
exists. It seems a deprecated code.
Why change LogFile.m_bytes_written to volatile is that now we have multiple
preproc threads and one flush thread, those threads may change
m_bytes_written concurrently.
> > -ink_mutex Log::flush_mutex;
> > -ink_cond Log::flush_cond;
> > -ink_thread Log::flush_thread;
> > +ink_mutex *Log::preproc_mutex;
> > +ink_cond *Log::preproc_cond;
> > +ink_mutex *Log::flush_mutex;
> > +ink_cond *Log::flush_cond;
> > +InkAtomicList *Log::flush_data_list;
> >
> > // Collate thread stuff
> > ink_mutex Log::collate_mutex;
> > ink_cond Log::collate_cond;
> > ink_thread Log::collate_thread;
> > int Log::collation_accept_file_descriptor;
> > +int Log::collation_preproc_threads;
> > int Log::collation_port;
> >
> > // Log private objects
> > @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
> >
> > struct PeriodicWakeup;
> > typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
> > -struct PeriodicWakeup : Continuation {
> > +struct PeriodicWakeup : Continuation
> > +{
> > + int m_preproc_threads;
> > + int m_flush_threads;
> > +
> > int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
> > {
> > - ink_cond_signal (&Log::flush_cond);
> > - return EVENT_CONT;
> > + for (int i = 0; i < m_preproc_threads; i++) {
> > + ink_cond_signal (&Log::preproc_cond[i]);
> > + }
> > + for (int i = 0; i < m_flush_threads; i++) {
> > + ink_cond_signal (&Log::flush_cond[i]);
> > + }
> > + return EVENT_CONT;
> > }
> >
> > - PeriodicWakeup () : Continuation (new_ProxyMutex())
> > + PeriodicWakeup (int preproc_threads, int flush_threads) :
> > + Continuation (new_ProxyMutex()),
> > + m_preproc_threads(preproc_threads),
> > + m_flush_threads(flush_threads)
> > {
> > - SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> > + SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> > }
> > };
> >
> > @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
> >
> /*-------------------------------------------------------------------------
> > MAIN INTERFACE
> >
> -------------------------------------------------------------------------*/
> > +struct LoggingPreprocContinuation: public Continuation
> > +{
> > + int m_idx;
> > +
> > + int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED
> */)
> > + {
> > + Log::preproc_thread_main((void *)&m_idx);
> > + return 0;
> > + }
> > +
> > + LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
> > + {
> > + SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
> > + }
> > +};
> > +
> > struct LoggingFlushContinuation: public Continuation
> > {
> > + int m_idx;
> > +
> > int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED
> */)
> > {
> > - Log::flush_thread_main(NULL);
> > + Log::flush_thread_main((void *)&m_idx);
> > return 0;
> > }
> >
> > - LoggingFlushContinuation():Continuation(NULL)
> > + LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
> > {
> > SET_HANDLER(&LoggingFlushContinuation::mainEvent);
> > }
> > @@ -910,6 +942,7 @@ Log::init(int flags)
> > numInactiveObjects = 0;
> > inactive_objects = new LogObject*[maxInactiveObjects];
> >
> > + collation_preproc_threads = 1;
> > collation_accept_file_descriptor = NO_FD;
> >
> > // store the configuration flags
> > @@ -931,6 +964,7 @@ Log::init(int flags)
> >
> > config->read_configuration_variables();
> > collation_port = config->collation_port;
> > + collation_preproc_threads = config->collation_preproc_threads;
> >
> > if (config_flags & STANDALONE_COLLATOR) {
> > logging_mode = LOG_TRANSACTIONS_ONLY;
> > @@ -959,8 +993,8 @@ Log::init(int flags)
> > create_threads();
> >
> > #ifndef INK_SINGLE_THREADED
> > - eventProcessor.schedule_every(NEW (new PeriodicWakeup()),
> HRTIME_SECOND,
> > - ET_CALL);
> > + eventProcessor.schedule_every(NEW (new
> > PeriodicWakeup(collation_preproc_threads, 1)),
> > + HRTIME_SECOND, ET_CALL);
> > #endif
> > init_status |= PERIODIC_WAKEUP_SCHEDULED;
> >
> > @@ -1001,9 +1035,16 @@ Log::init_when_enabled()
> > // setup global scrap object
> > //
> > global_scrap_format = NEW(new LogFormat(TEXT_LOG));
> > - global_scrap_object = NEW(new LogObject(global_scrap_format,
> > Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
> > - NULL,
> > Log::config->rolling_enabled, Log::config->rolling_interval_sec,
> > -
> Log::config->rolling_offset_hr,
> > Log::config->rolling_size_mb));
> > + global_scrap_object =
> > + NEW(new LogObject(global_scrap_format,
> > + Log::config->logfile_dir,
> > + "scrapfile.log",
> > + BINARY_LOG, NULL,
> > + Log::config->rolling_enabled,
> > + Log::config->collation_preproc_threads,
> > + Log::config->rolling_interval_sec,
> > + Log::config->rolling_offset_hr,
> > + Log::config->rolling_size_mb));
> >
> > // create the flush thread and the collation thread
> > //
> > @@ -1030,15 +1071,43 @@ Log::create_threads()
> >
> > REC_ReadConfigInteger(stacksize,
> "proxy.config.thread.default.stacksize");
> > if (!(init_status & THREADS_CREATED)) {
> > - // start the flush thread
> > +
> > + char desc[64];
> > + preproc_mutex = new ink_mutex[collation_preproc_threads];
> > + preproc_cond = new ink_cond[collation_preproc_threads];
> > +
> > + size_t stacksize;
> > + REC_ReadConfigInteger(stacksize,
> "proxy.config.thread.default.stacksize");
>
> This ^ seems like an unnecessary step, that could be pushed into ...
>
> > +
> > + // start the preproc threads
> > //
> > // no need for the conditional var since it will be relying on
> > // on the event system.
> > - ink_mutex_init(&flush_mutex, "Flush thread mutex");
> > - ink_cond_init(&flush_cond);
> > - Continuation *flush_continuation = NEW(new
> LoggingFlushContinuation);
> > - Event *flush_event = eventProcessor.spawn_thread(flush_continuation,
> > "[LOGGING]", stacksize);
> > - flush_thread = flush_event->ethread->tid;
> > + for (int i = 0; i < collation_preproc_threads; i++) {
> > + sprintf(desc, "Logging preproc thread mutex[%d]", i);
> > + ink_mutex_init(&preproc_mutex[i], desc);
> > + ink_cond_init(&preproc_cond[i]);
> > + Continuation *preproc_cont = NEW(new
> LoggingPreprocContinuation(i));
> > + sprintf(desc, "[LOG_PREPROC %d]", i);
> > + eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
>
> spawn_thread() as a default.. i.e.:
>
> eventProcessor::spawn_thread(Continuation, const char * desc, int
> stacksize -1)
> {
> size_t thread_stacksize;
> if stacksize -1 {
> REC_ReadConfigInteger(thread_stacksize,
> "proxy.config.thread.default.stacksize")
> } else {
> thread_stacksize = stacksize
> }
>
> }
>
Yes, agree. I think it's better to give another patch to improve
spawn_thread().
>
>
>
>
> >
> /*-------------------------------------------------------------------------
> > +<<<<<<< HEAD
> > LogFile::write_and_try_delete
> > +=======
> > + LogFile::preproc_and_try_delete
> > +
> > + preprocess the given buffer data before write to target file
> > + and try to delete it when its reference become zero.
> > +>>>>>>> TS-2089: introduce configurable collation preproc threads
>
>
> To reiterate a point Leif made recently about one of *my* commits:
> We should make sure that every single commit actually compiles..
>
Yes, it's comments:D
Thanks for your reviewing.
>
> Anyway, that's all I have for now.
> -- i
> Igor Galić
>
> Tel: +43 (0) 664 886 22 883
> Mail: i.galic@brainsware.org
> URL: http://brainsware.org/
> GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE
>
--
Yunkai Zhang
Work at Taobao