You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/11 01:10:41 UTC

[couchdb] branch 63012-scheduler updated: [fixup] optimize stats updater in scheduler

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

The following commit(s) were added to refs/heads/63012-scheduler by this push:
       new  40b0fbd   [fixup] optimize stats updater in scheduler
40b0fbd is described below

commit 40b0fbdad7b72031a1898339391f4ce4fa0ca216
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Apr 10 21:10:32 2017 -0400

    [fixup] optimize stats updater in scheduler
---
 src/couch_replicator/elvis.config                  |  25 +++
 src/couch_replicator/priv/stats_descriptions.cfg   |  12 --
 .../src/couch_replicator_scheduler.erl             | 167 +++++++++++----------
 3 files changed, 115 insertions(+), 89 deletions(-)

diff --git a/src/couch_replicator/elvis.config b/src/couch_replicator/elvis.config
new file mode 100644
index 0000000..efacbb7
--- /dev/null
+++ b/src/couch_replicator/elvis.config
@@ -0,0 +1,25 @@
+[
+ {
+   elvis,
+   [
+    {config,
+     [#{dirs => ["src"],
+        filter => "*.erl",
+        rules => [
+        {elvis_style, line_length, #{limit => 80}},
+        {elvis_style, no_tabs, disable},
+        {elvis_style, macro_names, disable},
+        {elvis_style, operator_spaces, disable},
+        {elvis_style, nesting_level, disable},
+        {elvis_style, no_if_expression, disable},
+        {elvis_style, no_nested_try_catch, disable},
+        {elvis_style, state_record_and_type, disable},
+        {elvis_style, no_spec_with_records, disable}
+        ],
+        ruleset => erl_files
+       }
+     ]
+    }
+   ]
+ }
+].
diff --git a/src/couch_replicator/priv/stats_descriptions.cfg b/src/couch_replicator/priv/stats_descriptions.cfg
index 777b290..627da79 100644
--- a/src/couch_replicator/priv/stats_descriptions.cfg
+++ b/src/couch_replicator/priv/stats_descriptions.cfg
@@ -126,18 +126,6 @@
     {type, gauge},
     {desc, <<"total number of replicator scheduler jobs">>}
 ]}.
-{[couch_replicator, jobs, avg_running], [
-    {type, gauge},
-    {desc, <<"average of length of time current jobs have been running">>}
-]}.
-{[couch_replicator, jobs, avg_pending], [
-    {type, gauge},
-    {desc, <<"average length of time spent waiting to run">>}
-]}.
-{[couch_replicator, jobs, avg_crashed], [
-    {type, gauge},
-    {desc, <<"average of length of time since last crash">>}
-]}.
 {[couch_replicator, connection, acquires], [
     {type, counter},
     {desc, <<"number of times connections are shared">>}
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index ce17e58..2279b90 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -63,6 +63,7 @@
 -define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
 -define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
 -define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
 
 -define(DEFAULT_MAX_JOBS, 500).
 -define(DEFAULT_MAX_CHURN, 20).
@@ -70,22 +71,20 @@
 -define(DEFAULT_SCHEDULER_INTERVAL, 60000).
 
 
--record(state, {interval, timer, max_jobs, max_churn, max_history}).
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
 -record(job, {
-          id :: job_id() | '$1' | '_',
-          rep :: #rep{} | '_',
-          pid :: undefined | pid() | '$1' | '_',
-          monitor :: undefined | reference() | '_',
-          history :: history() | '_'}).
+    id :: job_id() | '$1' | '_',
+    rep :: #rep{} | '_',
+    pid :: undefined | pid() | '$1' | '_',
+    monitor :: undefined | reference() | '_',
+    history :: history() | '_'
+}).
 
 -record(stats_acc, {
-          now  :: erlang:timestamp(),
-          pending_t = 0 :: non_neg_integer(),
-          running_t = 0 :: non_neg_integer(),
-          crashed_t = 0 :: non_neg_integer(),
-          pending_n = 0 :: non_neg_integer(),
-          running_n = 0 :: non_neg_integer(),
-          crashed_n = 0 :: non_neg_integer()}).
+    pending_n = 0 :: non_neg_integer(),
+    running_n = 0 :: non_neg_integer(),
+    crashed_n = 0 :: non_neg_integer()
+}).
 
 
 %% public functions
@@ -207,7 +206,8 @@ init(_) ->
         max_jobs = MaxJobs,
         max_churn = MaxChurn,
         max_history = MaxHistory,
-        timer = Timer
+        timer = Timer,
+        stats_pid = spawn_link(fun stats_updater_loop/0)
     },
     {ok, State}.
 
@@ -267,7 +267,7 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
     {ok, Job} = job_by_pid(Pid),
     couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
     remove_job_int(Job),
-    update_running_jobs_stats(),
+    update_running_jobs_stats(State#state.stats_pid),
     {noreply, State};
 
 handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
@@ -340,12 +340,12 @@ handle_config_terminate(_, _, _) ->
 % in the scheduler will be retried indefinitely (with appropriate exponential
 % backoffs).
 -spec handle_crashed_job(#job{}, any(), #state{}) -> ok.
-handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, _State) ->
+handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) ->
     Msg = "~p : Transient job ~p failed, removing. Error: ~p",
     ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason),
     couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]),
     remove_job_int(Job),
-    update_running_jobs_stats(),
+    update_running_jobs_stats(State#state.stats_pid),
     ok;
 
 handle_crashed_job(Job, Reason, State) ->
@@ -363,7 +363,7 @@ handle_crashed_job(Job, Reason, State) ->
             % scheduler could be blocked if there is a cascade of lots failing
             % jobs in a row.
             start_pending_jobs(State),
-            update_running_jobs_stats(),
+            update_running_jobs_stats(State#state.stats_pid),
             ok;
         false ->
             ok
@@ -380,7 +380,7 @@ maybe_start_newly_added_job(Job, State) ->
     case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of
         true ->
             start_job_int(Job, State),
-            update_running_jobs_stats(),
+            update_running_jobs_stats(State#state.stats_pid),
             ok;
         false ->
             ok
@@ -548,7 +548,7 @@ maybe_remove_job_int(JobId, State) ->
             TotalJobs = ets:info(?MODULE, size),
             couch_stats:update_gauge([couch_replicator, jobs, total],
                 TotalJobs),
-            update_running_jobs_stats(),
+            update_running_jobs_stats(State#state.stats_pid),
             ok;
         {error, not_found} ->
             ok
@@ -681,7 +681,7 @@ reschedule(State) ->
     stop_excess_jobs(State, Running),
     start_pending_jobs(State, Running, Pending),
     rotate_jobs(State, Running, Pending),
-    update_running_jobs_stats(),
+    update_running_jobs_stats(State#state.stats_pid),
     ok.
 
 
@@ -742,58 +742,6 @@ update_history(Job, Type, When, State) ->
     Job#job{history = History1}.
 
 
--spec update_running_jobs_stats() -> ok.
-update_running_jobs_stats() ->
-    Acc0 = #stats_acc{now = os:timestamp()},
-    AccR = ets:foldl(fun stats_fold/2, Acc0, ?MODULE),
-    #stats_acc{
-        pending_t = PendingSum,
-        running_t = RunningSum,
-        crashed_t = CrashedSum,
-        pending_n = PendingN,
-        running_n = RunningN,
-        crashed_n = CrashedN
-    } = AccR,
-    PendingAvg = avg(PendingSum, PendingN),
-    RunningAvg = avg(RunningSum, RunningN),
-    CrashedAvg = avg(CrashedSum, CrashedN),
-    couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
-    couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
-    couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
-    couch_stats:update_gauge([couch_replicator, jobs, avg_pending], PendingAvg),
-    couch_stats:update_gauge([couch_replicator, jobs, avg_running], RunningAvg),
-    couch_stats:update_gauge([couch_replicator, jobs, avg_crashed], CrashedAvg),
-    ok.
-
-
--spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
-stats_fold(#job{pid = undefined, history = [{added, T}]}, Acc) ->
-    #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
-    Dt = round(timer:now_diff(Now, T) / 1000000),
-    Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-stats_fold(#job{pid = undefined, history = [{stopped, T} | _]}, Acc) ->
-    #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
-    Dt = round(timer:now_diff(Now, T) / 1000000),
-    Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-stats_fold(#job{pid = undefined, history = [{{crashed, _}, T} | _]}, Acc) ->
-    #stats_acc{now = Now, crashed_t = SumT, crashed_n = Cnt} = Acc,
-    Dt = round(timer:now_diff(Now, T) / 1000000),
-    Acc#stats_acc{crashed_t = SumT + Dt, crashed_n = Cnt + 1};
-stats_fold(#job{pid = P, history = [{started, T} | _]}, Acc) when is_pid(P) ->
-    #stats_acc{now = Now, running_t = SumT, running_n = Cnt} = Acc,
-    Dt = round(timer:now_diff(Now, T) / 1000000),
-    Acc#stats_acc{running_t = SumT + Dt, running_n = Cnt + 1}.
-
-
--spec avg(Sum :: non_neg_integer(), N :: non_neg_integer()) ->
-    non_neg_integer().
-avg(_Sum, 0) ->
-    0;
-
-avg(Sum, N) when N > 0 ->
-    round(Sum / N).
-
-
 -spec ejson_url(#httpdb{} | binary()) -> binary().
 ejson_url(#httpdb{}=Httpdb) ->
     couch_util:url_strip_password(Httpdb#httpdb.url);
@@ -901,6 +849,69 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
     end.
 
 
+% Updater is a separate process. It receives `update_stats` messages and
+% updates scheduler stats from the scheduler jobs table. Updates are
+% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds.
+
+update_running_jobs_stats(StatsPid) when is_pid(StatsPid) ->
+    StatsPid ! update_stats,
+    ok.
+
+
+stats_updater_loop() ->
+    receive
+        update_stats ->
+            erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats),
+            stats_updater_pending_refresh_loop();
+        OtherMsg ->
+            % Only accept update_state, discard stray messaes and log as error
+            couch_log:error("~p Invalid message received by stats updater ~p",
+                 [?MODULE, OtherMsg]),
+            stats_updater_loop()
+    end.
+
+
+stats_updater_pending_refresh_loop() ->
+    receive
+        refresh_stats ->
+            ok = stats_updater_refresh(),
+            stats_updater_loop();
+        update_stats ->
+            % Ignore update_stats here, refresh already scheduled
+            stats_updater_pending_refresh_loop();
+        OtherMsg ->
+            % Any messages are logged as error as they are not expected
+            couch_log:error("~p Invalid message received by stats updater ~p",
+                [?MODULE, OtherMsg]),
+            stats_updater_pending_refresh_loop()
+    end.
+
+
+-spec stats_updater_refresh() -> ok.
+stats_updater_refresh() ->
+    #stats_acc{
+       pending_n = PendingN,
+       running_n = RunningN,
+       crashed_n = CrashedN
+    } =  ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE),
+    couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
+    couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
+    couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
+    ok.
+
+
+-spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
+stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) ->
+    Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
+stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) ->
+    Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
+stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) ->
+    Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1};
+stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) ->
+    Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}.
+
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
@@ -1251,7 +1262,7 @@ t_if_transient_job_crashes_it_gets_removed() ->
         },
         setup_jobs([Job]),
         ?assertEqual(1, ets:info(?MODULE, size)),
-        State = #state{max_history = 3},
+        State = #state{max_history = 3, stats_pid = self()},
         {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
             State),
         ?assertEqual(0, ets:info(?MODULE, size))
@@ -1269,7 +1280,7 @@ t_if_permanent_job_crashes_it_stays_in_ets() ->
         },
         setup_jobs([Job]),
         ?assertEqual(1, ets:info(?MODULE, size)),
-        State = #state{max_jobs =1, max_history = 3},
+        State = #state{max_jobs =1, max_history = 3, stats_pid = self()},
         {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
             State),
         ?assertEqual(1, ets:info(?MODULE, size)),
@@ -1331,14 +1342,16 @@ mock_state(MaxJobs) ->
     #state{
         max_jobs = MaxJobs,
         max_churn = ?DEFAULT_MAX_CHURN,
-        max_history = ?DEFAULT_MAX_HISTORY
+        max_history = ?DEFAULT_MAX_HISTORY,
+        stats_pid = self()
     }.
 
 mock_state(MaxJobs, MaxChurn) ->
     #state{
         max_jobs = MaxJobs,
         max_churn = MaxChurn,
-        max_history = ?DEFAULT_MAX_HISTORY
+        max_history = ?DEFAULT_MAX_HISTORY,
+        stats_pid = self()
     }.
 
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].