You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/11 01:10:41 UTC
[couchdb] branch 63012-scheduler updated: [fixup] optimize stats
updater in scheduler
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/63012-scheduler by this push:
new 40b0fbd [fixup] optimize stats updater in scheduler
40b0fbd is described below
commit 40b0fbdad7b72031a1898339391f4ce4fa0ca216
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Apr 10 21:10:32 2017 -0400
[fixup] optimize stats updater in scheduler
---
src/couch_replicator/elvis.config | 25 +++
src/couch_replicator/priv/stats_descriptions.cfg | 12 --
.../src/couch_replicator_scheduler.erl | 167 +++++++++++----------
3 files changed, 115 insertions(+), 89 deletions(-)
diff --git a/src/couch_replicator/elvis.config b/src/couch_replicator/elvis.config
new file mode 100644
index 0000000..efacbb7
--- /dev/null
+++ b/src/couch_replicator/elvis.config
@@ -0,0 +1,25 @@
+[
+ {
+ elvis,
+ [
+ {config,
+ [#{dirs => ["src"],
+ filter => "*.erl",
+ rules => [
+ {elvis_style, line_length, #{limit => 80}},
+ {elvis_style, no_tabs, disable},
+ {elvis_style, macro_names, disable},
+ {elvis_style, operator_spaces, disable},
+ {elvis_style, nesting_level, disable},
+ {elvis_style, no_if_expression, disable},
+ {elvis_style, no_nested_try_catch, disable},
+ {elvis_style, state_record_and_type, disable},
+ {elvis_style, no_spec_with_records, disable}
+ ],
+ ruleset => erl_files
+ }
+ ]
+ }
+ ]
+ }
+].
diff --git a/src/couch_replicator/priv/stats_descriptions.cfg b/src/couch_replicator/priv/stats_descriptions.cfg
index 777b290..627da79 100644
--- a/src/couch_replicator/priv/stats_descriptions.cfg
+++ b/src/couch_replicator/priv/stats_descriptions.cfg
@@ -126,18 +126,6 @@
{type, gauge},
{desc, <<"total number of replicator scheduler jobs">>}
]}.
-{[couch_replicator, jobs, avg_running], [
- {type, gauge},
- {desc, <<"average of length of time current jobs have been running">>}
-]}.
-{[couch_replicator, jobs, avg_pending], [
- {type, gauge},
- {desc, <<"average length of time spent waiting to run">>}
-]}.
-{[couch_replicator, jobs, avg_crashed], [
- {type, gauge},
- {desc, <<"average of length of time since last crash">>}
-]}.
{[couch_replicator, connection, acquires], [
{type, counter},
{desc, <<"number of times connections are shared">>}
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index ce17e58..2279b90 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -63,6 +63,7 @@
-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
-define(DEFAULT_MAX_JOBS, 500).
-define(DEFAULT_MAX_CHURN, 20).
@@ -70,22 +71,20 @@
-define(DEFAULT_SCHEDULER_INTERVAL, 60000).
--record(state, {interval, timer, max_jobs, max_churn, max_history}).
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
-record(job, {
- id :: job_id() | '$1' | '_',
- rep :: #rep{} | '_',
- pid :: undefined | pid() | '$1' | '_',
- monitor :: undefined | reference() | '_',
- history :: history() | '_'}).
+ id :: job_id() | '$1' | '_',
+ rep :: #rep{} | '_',
+ pid :: undefined | pid() | '$1' | '_',
+ monitor :: undefined | reference() | '_',
+ history :: history() | '_'
+}).
-record(stats_acc, {
- now :: erlang:timestamp(),
- pending_t = 0 :: non_neg_integer(),
- running_t = 0 :: non_neg_integer(),
- crashed_t = 0 :: non_neg_integer(),
- pending_n = 0 :: non_neg_integer(),
- running_n = 0 :: non_neg_integer(),
- crashed_n = 0 :: non_neg_integer()}).
+ pending_n = 0 :: non_neg_integer(),
+ running_n = 0 :: non_neg_integer(),
+ crashed_n = 0 :: non_neg_integer()
+}).
%% public functions
@@ -207,7 +206,8 @@ init(_) ->
max_jobs = MaxJobs,
max_churn = MaxChurn,
max_history = MaxHistory,
- timer = Timer
+ timer = Timer,
+ stats_pid = spawn_link(fun stats_updater_loop/0)
},
{ok, State}.
@@ -267,7 +267,7 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
{ok, Job} = job_by_pid(Pid),
couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
remove_job_int(Job),
- update_running_jobs_stats(),
+ update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
@@ -340,12 +340,12 @@ handle_config_terminate(_, _, _) ->
% in the scheduler will be retried indefinitely (with appropriate exponential
% backoffs).
-spec handle_crashed_job(#job{}, any(), #state{}) -> ok.
-handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, _State) ->
+handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) ->
Msg = "~p : Transient job ~p failed, removing. Error: ~p",
ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason),
couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]),
remove_job_int(Job),
- update_running_jobs_stats(),
+ update_running_jobs_stats(State#state.stats_pid),
ok;
handle_crashed_job(Job, Reason, State) ->
@@ -363,7 +363,7 @@ handle_crashed_job(Job, Reason, State) ->
% scheduler could be blocked if there is a cascade of lots failing
% jobs in a row.
start_pending_jobs(State),
- update_running_jobs_stats(),
+ update_running_jobs_stats(State#state.stats_pid),
ok;
false ->
ok
@@ -380,7 +380,7 @@ maybe_start_newly_added_job(Job, State) ->
case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of
true ->
start_job_int(Job, State),
- update_running_jobs_stats(),
+ update_running_jobs_stats(State#state.stats_pid),
ok;
false ->
ok
@@ -548,7 +548,7 @@ maybe_remove_job_int(JobId, State) ->
TotalJobs = ets:info(?MODULE, size),
couch_stats:update_gauge([couch_replicator, jobs, total],
TotalJobs),
- update_running_jobs_stats(),
+ update_running_jobs_stats(State#state.stats_pid),
ok;
{error, not_found} ->
ok
@@ -681,7 +681,7 @@ reschedule(State) ->
stop_excess_jobs(State, Running),
start_pending_jobs(State, Running, Pending),
rotate_jobs(State, Running, Pending),
- update_running_jobs_stats(),
+ update_running_jobs_stats(State#state.stats_pid),
ok.
@@ -742,58 +742,6 @@ update_history(Job, Type, When, State) ->
Job#job{history = History1}.
--spec update_running_jobs_stats() -> ok.
-update_running_jobs_stats() ->
- Acc0 = #stats_acc{now = os:timestamp()},
- AccR = ets:foldl(fun stats_fold/2, Acc0, ?MODULE),
- #stats_acc{
- pending_t = PendingSum,
- running_t = RunningSum,
- crashed_t = CrashedSum,
- pending_n = PendingN,
- running_n = RunningN,
- crashed_n = CrashedN
- } = AccR,
- PendingAvg = avg(PendingSum, PendingN),
- RunningAvg = avg(RunningSum, RunningN),
- CrashedAvg = avg(CrashedSum, CrashedN),
- couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
- couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
- couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
- couch_stats:update_gauge([couch_replicator, jobs, avg_pending], PendingAvg),
- couch_stats:update_gauge([couch_replicator, jobs, avg_running], RunningAvg),
- couch_stats:update_gauge([couch_replicator, jobs, avg_crashed], CrashedAvg),
- ok.
-
-
--spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
-stats_fold(#job{pid = undefined, history = [{added, T}]}, Acc) ->
- #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
- Dt = round(timer:now_diff(Now, T) / 1000000),
- Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-stats_fold(#job{pid = undefined, history = [{stopped, T} | _]}, Acc) ->
- #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
- Dt = round(timer:now_diff(Now, T) / 1000000),
- Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-stats_fold(#job{pid = undefined, history = [{{crashed, _}, T} | _]}, Acc) ->
- #stats_acc{now = Now, crashed_t = SumT, crashed_n = Cnt} = Acc,
- Dt = round(timer:now_diff(Now, T) / 1000000),
- Acc#stats_acc{crashed_t = SumT + Dt, crashed_n = Cnt + 1};
-stats_fold(#job{pid = P, history = [{started, T} | _]}, Acc) when is_pid(P) ->
- #stats_acc{now = Now, running_t = SumT, running_n = Cnt} = Acc,
- Dt = round(timer:now_diff(Now, T) / 1000000),
- Acc#stats_acc{running_t = SumT + Dt, running_n = Cnt + 1}.
-
-
--spec avg(Sum :: non_neg_integer(), N :: non_neg_integer()) ->
- non_neg_integer().
-avg(_Sum, 0) ->
- 0;
-
-avg(Sum, N) when N > 0 ->
- round(Sum / N).
-
-
-spec ejson_url(#httpdb{} | binary()) -> binary().
ejson_url(#httpdb{}=Httpdb) ->
couch_util:url_strip_password(Httpdb#httpdb.url);
@@ -901,6 +849,69 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
end.
+% Updater is a separate process. It receives `update_stats` messages and
+% updates scheduler stats from the scheduler jobs table. Updates are
+% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds.
+
+update_running_jobs_stats(StatsPid) when is_pid(StatsPid) ->
+ StatsPid ! update_stats,
+ ok.
+
+
+stats_updater_loop() ->
+ receive
+ update_stats ->
+ erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats),
+ stats_updater_pending_refresh_loop();
+ OtherMsg ->
+ % Only accept update_state, discard stray messaes and log as error
+ couch_log:error("~p Invalid message received by stats updater ~p",
+ [?MODULE, OtherMsg]),
+ stats_updater_loop()
+ end.
+
+
+stats_updater_pending_refresh_loop() ->
+ receive
+ refresh_stats ->
+ ok = stats_updater_refresh(),
+ stats_updater_loop();
+ update_stats ->
+ % Ignore update_stats here, refresh already scheduled
+ stats_updater_pending_refresh_loop();
+ OtherMsg ->
+ % Any messages are logged as error as they are not expected
+ couch_log:error("~p Invalid message received by stats updater ~p",
+ [?MODULE, OtherMsg]),
+ stats_updater_pending_refresh_loop()
+ end.
+
+
+-spec stats_updater_refresh() -> ok.
+stats_updater_refresh() ->
+ #stats_acc{
+ pending_n = PendingN,
+ running_n = RunningN,
+ crashed_n = CrashedN
+ } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE),
+ couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
+ couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
+ couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
+ ok.
+
+
+-spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
+stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) ->
+ Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
+stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) ->
+ Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
+stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) ->
+ Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1};
+stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) ->
+ Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}.
+
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1251,7 +1262,7 @@ t_if_transient_job_crashes_it_gets_removed() ->
},
setup_jobs([Job]),
?assertEqual(1, ets:info(?MODULE, size)),
- State = #state{max_history = 3},
+ State = #state{max_history = 3, stats_pid = self()},
{noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
State),
?assertEqual(0, ets:info(?MODULE, size))
@@ -1269,7 +1280,7 @@ t_if_permanent_job_crashes_it_stays_in_ets() ->
},
setup_jobs([Job]),
?assertEqual(1, ets:info(?MODULE, size)),
- State = #state{max_jobs =1, max_history = 3},
+ State = #state{max_jobs =1, max_history = 3, stats_pid = self()},
{noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
State),
?assertEqual(1, ets:info(?MODULE, size)),
@@ -1331,14 +1342,16 @@ mock_state(MaxJobs) ->
#state{
max_jobs = MaxJobs,
max_churn = ?DEFAULT_MAX_CHURN,
- max_history = ?DEFAULT_MAX_HISTORY
+ max_history = ?DEFAULT_MAX_HISTORY,
+ stats_pid = self()
}.
mock_state(MaxJobs, MaxChurn) ->
#state{
max_jobs = MaxJobs,
max_churn = MaxChurn,
- max_history = ?DEFAULT_MAX_HISTORY
+ max_history = ?DEFAULT_MAX_HISTORY,
+ stats_pid = self()
}.
--
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].