You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/02/08 15:52:16 UTC

[couchdb] branch fair-share-scheduler-3.x updated: [wip] move to a separate file

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch fair-share-scheduler-3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/fair-share-scheduler-3.x by this push:
     new 702feba  [wip] move to a separate file
702feba is described below

commit 702febac5dbb3317febff8daad330717615125d0
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Feb 8 10:50:06 2021 -0500

    [wip] move to a separate file
---
 .../src/couch_replicator_scheduler.erl             | 153 ++------------------
 .../src/couch_replicator_scheduler.hrl             |  11 ++
 .../src/couch_replicator_share.erl                 | 158 +++++++++++++++++++++
 3 files changed, 179 insertions(+), 143 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index bec8d7b..fccda17 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -77,11 +77,6 @@
 -define(DEFAULT_MAX_HISTORY, 20).
 -define(DEFAULT_SCHEDULER_INTERVAL, 60000).
 
--define(DEFAULT_PRIORITY_COEFF, "0.75").
--define(DEFAULT_SHARES, 100).
--define(SHARES, couch_replicator_scheduler_shares).
--define(USAGE, couch_replicator_scheduler_usage).
-
 
 -record(state, {
     interval,
@@ -89,32 +84,15 @@
     max_jobs,
     max_churn,
     max_history,
-    priority_coeff,
     stats_pid
 }).
 
--record(job, {
-    id :: job_id() | '$1' | '_',
-    rep :: #rep{} | '_',
-    pid :: undefined | pid() | '$1' | '_',
-    monitor :: undefined | reference() | '_',
-    history :: history() | '_',
-    usage_key :: undefined | binary() | '_',
-    priority :: float()
-}).
-
 -record(stats_acc, {
     pending_n = 0 :: non_neg_integer(),
     running_n = 0 :: non_neg_integer(),
     crashed_n = 0 :: non_neg_integer()
 }).
 
--record(usage, {
-   key :: binary(),
-   usage = 0 :: non_neg_integer(),
-   num_jobs = 0 :: non_neg_integer(),
-}).
-
 
 %% public functions
 
@@ -127,12 +105,14 @@ start_link() ->
 add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
     case existing_replication(Rep) of
         false ->
+	    UsageKey = couch_replicator_share:get_usage_key(Rep),
+            Priority = couch_replicator_share:new_job_priority(Rep),
             Job = #job{
                 id = Rep#rep.id,
                 rep = Rep,
                 history = [{added, os:timestamp()}],
-                usage_key = get_usage_key(Rep),
-                priority = new_job_priority(Rep)
+                usage_key = UsageKey,
+                priority = Priority
             },
             gen_server:call(?MODULE, {add_job, Job}, infinity);
         true ->
@@ -260,9 +240,6 @@ init(_) ->
     EtsOpts = [named_table, {read_concurrency, true},
         {write_concurrency, true}],
     ?MODULE = ets:new(?MODULE, EtsOpts ++ [{keypos, #job.id}]),
-    ?SHARES = ets:new(?MODULE, EtsOpts),
-    ?STOPPED_USAGE = ets:new(?STOPPED_USAGE, EtsOpts),
-    ?USAGE = ?USAGE = ets:new(?USAGE, EtsOpts),
     ok = config:listen_for_changes(?MODULE, nil),
     Interval = config:get_integer("replicator", "interval",
         ?DEFAULT_SCHEDULER_INTERVAL),
@@ -271,16 +248,13 @@ init(_) ->
         ?DEFAULT_MAX_CHURN),
     MaxHistory = config:get_integer("replicator", "max_history",
         ?DEFAULT_MAX_HISTORY),
-    PriorityCoeff = list_to_float(config:get("replicator", "priority_coeff",
-        ?DEFAULT_PRIORITY_COEFF)),
     Timer = erlang:send_after(Interval, self(), reschedule),
-    ok = init_shares(),
+    ok = couch_replicator_share:init(),
     State = #state{
         interval = Interval,
         max_jobs = MaxJobs,
         max_churn = MaxChurn,
         max_history = MaxHistory,
-        priority_coeff = PriorityCoeff,
         timer = Timer,
         stats_pid = start_stats_updater()
     },
@@ -328,15 +302,10 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval),
     couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
     {noreply, State#state{interval = Interval}};
 
-handle_cast({set_priority_coeff, Coeff}, State) when is_float(Coeff),
-        Coeff > 0 ->
-    couch_log:notice("~p: priority_coeff set to ~B", [?MODULE, Coeff]),
-    {noreply, State#state{priority_coeff = Coeff}};
-
 handle_cast({update_shares, Key, Shares}, State) when is_binary(Db),
         is_integer(Shares), Shares >= 0 ->
     couch_log:notice("~ shares for ~S set to ~B", [?MODULE, Key, Shares]),
-    ok = update_shares(Key, Shares),
+    ok = couch_replicator_share:update_shares(Key, Shares),
     {noreply, State};
 
 
@@ -364,7 +333,7 @@ handle_info(reschedule, State) ->
 handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
     {ok, Job} = job_by_pid(Pid),
     couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
-    ok = add_stopped_job_usage(Job, os:timestamp()),
+    ok = couch_replicator_share:add_stopped_job_usage(Job, os:timestamp()),
     remove_job_int(Job),
     update_running_jobs_stats(State#state.stats_pid),
     {noreply, State};
@@ -375,7 +344,7 @@ handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
         {shutdown, ShutdownReason} -> ShutdownReason;
         Other -> Other
     end,
-    ok = add_stopped_job_usage(Job, os:timestamp()),
+    ok = couch_replicator_share:add_stopped_job_usage(Job, os:timestamp()),
     ok = handle_crashed_job(Job, Reason, State),
     {noreply, State};
 
@@ -421,10 +390,6 @@ handle_config_change("replicator", "max_history", V, _, S) ->
     ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
     {ok, S};
 
-handle_config_change("replicator", "priority_coeff", V, _, S) ->
-    ok = gen_server:cast(?MODULE, {set_priority_coeff, list_to_float(V)}),
-    {ok, S};
-
 handle_config_change("replicator.shares", Key, V, _, S) ->
     ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key),
         list_to_integer(V)}),
@@ -661,7 +626,8 @@ add_job_int(#job{} = Job) ->
 maybe_remove_job_int(JobId, State) ->
     case job_by_id(JobId) of
         {ok, Job} ->
-            ok = add_stopped_job_usage(Job, os:timestamp()),
+	    Now = os:timestamp(),
+            ok = couch_replicator_share:add_stopped_job_usage(Job, Now),
             ok = stop_job_int(Job, State),
             true = remove_job_int(Job),
             couch_stats:increment_counter([couch_replicator, jobs, removes]),
@@ -1033,105 +999,6 @@ existing_replication(#rep{} = NewRep) ->
     end.
 
 
-init_shares() ->
-    lists:foreach(fun({K, V}) ->
-        ok = update_shares(list_to_binary(K), list_to_integer(V))
-    end, config:get("replicator.shares").
-
-
-update_shares(Key, Shares) when is_binary(Db), is_integer(Shares) ->
-    true = ets:insert(?SHARES, {Key, Shares}),
-    ok.
-
-
-get_usage_key(#rep{} = Rep) ->
-    case is_binary(Rep#rep.db_name) of
-        true -> Rep#rep.db_name;
-        false -> (Rep#rep.user_ctx)#user_ctx.name
-    end.
-
-
-% Job usage is defined as number of milliseconds it was running since the last
-% time it started.
-get_job_usage(#job{} = Now, {_, _, _} = Now) ->
-    timer:now_diff(Now, last_started(Job)) div 1000.
-
-
-new_job_priority(#rep{} = Rep) ->
-    0.0. % get average priority here so new jobs don't monopolize the scheduler
-
-
-get_usage(Db) when is_binary(Db) ->
-    0.
-
-
-get_shares(Db) when is_binary(Db) ->
-    0.
-
-
-get_db_num_jobs(Db) when is_binary(Db) ->
-    0.
-
-
-% Accumulate usage from jobs which stop in the middle of the sheduling cycle.
-% The accumulated value is then summed up with all the running job usage at the
-% start of each scheduling cycle.
-%
-add_stopped_job_usage(#job{pid = undefined}, {_, _, _} = _Now) ->
-    % Only accumulate usage from jobs which were running. If a stopped jobs is
-    % removed we ignore it here.
-    ok;
-
-add_stopped_job_usage(#job{} = Job, {_, _, _} = Now) ->
-    Key = Job#job.usage_key,
-    Usage = get_job_usage(Job, Now),
-    ets:update_counter(?STOPPED_USAGE, Key, Usage, {Key, 0}),
-    ok.
-
-update_usage({_, _, } = Now) ->
-    StoppedUsage = ets:foldl(fun({Key, Val}, Acc) ->
-        Acc#{Key := Val}
-    end, #{}, ?STOPPED_USAGE),
-
-    % Next cycle start with a clean stopped usage table
-    true = ets:delete_all_objects(?STOPPED_USAGE),
-
-    % Get usage from all the running jobs and per-db job counts. Running job
-    % usage is added to the already computed stopped job usage.
-    {Usage, NumJobs} = ets:foldl(fun(Job, {UsgAcc, CntAcc}) ->
-        Key = Job#job.usage_key,
-        CntFun = fun(Val) -> Val + 1 end,
-        CntAcc1 = maps:update_with(Key, CntFun, 0, CntAcc),
-        UsgAcc1 = case is_pid(Job#job.pid) of
-            true ->
-                UsageInc = get_job_usage(Job, Now),
-                UsgFun = fun(Val) -> Val + UsageInc end,
-                maps:update_with(Key, UsgFun, 0, UsgAcc);
-            false ->
-                UsgAcc
-        end,
-        {CntAcc1, UsgAcc1}
-    end, {StoppedUsage, #{}}, ?MODULE),
-
-    % Clear the table first. This will usage stats from accounts which are not
-    % longer running or haven't just stopped running their jobs in the last
-    % scheduling cycle.
-    true = ets:delete_all_objects(?USAGE),
-
-    maps:fold(fun(Key, Usage, ok) ->
-        JobCount = maps:get(Key, NumJobs, 0)
-        true = ets:insert(?USAGE, {Key, Usage, maps:get(Key, NumJobs, 0)})
-    end, ok, Usage).
-
-
-decay_priorities(PriorityCoeff) ->
-    ok.
-
-
-adjust_priorities() ->
-    ok.
-
-
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.hrl b/src/couch_replicator/src/couch_replicator_scheduler.hrl
index 5203b0c..6b88279 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.hrl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.hrl
@@ -13,3 +13,14 @@
 
 -type job_id() :: term().
 -type job_args() :: term().
+
+
+-record(job, {
+    id :: job_id() | '$1' | '_',
+    rep :: #rep{} | '_',
+    pid :: undefined | pid() | '$1' | '_',
+    monitor :: undefined | reference() | '_',
+    history :: history() | '_',
+    usage_key :: undefined | binary() | '_',
+    priority :: float()
+}).
diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl
new file mode 100644
index 0000000..08189c5
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_share.erl
@@ -0,0 +1,158 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+
+    update_shares/2,
+
+    job_added/1,
+    job_removed/1,
+
+    get_usage_key/1,
+    new_job_priority/1,
+    add_stopped_job_usage/2,
+
+    update_usage/1,
+    decay_all_priorities/1,
+    adjust_running_priorities/1
+]).
+
+
+-include("couch_replicator.hrl").
+-include("couch_replicator_scheduler.hrl").
+
+
+-define(DEFAULT_USAGE_COEFF, "0.2").
+-define(DEFAULT_PRIORITY_COEFF, "0.8").
+-define(DEFAULT_SHARES, 100).
+-define(SHARES, couch_replicator_shares).
+-define(USAGE, couch_replicator_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table],
+    ?SHARES = ets:new(?MODULE, EtsOpts),
+    ?STOPPED_USAGE = ets:new(?STOPPED_USAGE, EtsOpts),
+    ?USAGE = ets:new(?USAGE, EtsOpts),
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), 
+    lists:foreach(fun({K, V}) ->
+        ok = update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares").
+
+
+update_shares(Key, Shares) when is_binary(Db), is_integer(Shares) ->
+    true = ets:insert(?SHARES, {Key, Shares}),
+    ok.
+
+
+job_added(#job{} = Job) ->
+    Key = Job#job.usage_key,
+    ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}).
+
+
+job_removed(#job{} = Job) ->
+    Key = Job#job.usage_key,
+    case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+	N when is_integer(N), N =< 0 ->
+	    true = ets:delete(?NUM_JOBS, Key);
+	N when is_integer(N), N > 0 ->
+	    ok
+    end.
+
+
+get_usage_key(#rep{} = Rep) ->
+    case is_binary(Rep#rep.db_name) of
+        true -> Rep#rep.db_name;
+        false -> (Rep#rep.user_ctx)#user_ctx.name
+    end.
+
+
+new_job_priority(#rep{} = Rep) ->
+    0.0. % get average priority here so new jobs don't monopolize the scheduler
+
+
+% Accumulate usage from jobs which stop in the middle of the sheduling
+% cycle. The accumulated value is then summed up with all the running
+% jobs usage at the start of each scheduling cycle.
+%
+add_stopped_job_usage(#job{pid = undefined}, {_, _, _} = _Now) ->
+    % Only accumulate usage from jobs which were running. If a stopped jobs is
+    % removed it's ignored.
+    ok;
+
+add_stopped_job_usage(#job{} = Job, {_, _, _} = Now) ->
+    Key = Job#job.usage_key,
+    Usage = get_job_usage(Job, Now),
+    ets:update_counter(?STOPPED_USAGE, Key, Usage, {Key, 0}),
+    ok.
+
+
+update_usage(Ets, {_, _, } = Now) ->
+    StoppedUsage = ets:foldl(fun({Key, Val}, Acc) ->
+        Acc#{Key := Val}
+    end, #{}, ?STOPPED_USAGE),
+
+    % Next cycle start with a clean stopped usage table
+    true = ets:delete_all_objects(?STOPPED_USAGE),
+
+    % Get usage from all the running jobs. Running job
+    % usage is added to the already computed stopped job usage.
+    Usage = lists:foldl(fun(Job, Acc) ->
+        Key = Job#job.usage_key,
+	JobUsage = get_job_usage(Job, Now),
+        Fun = fun(Val) -> Val + JobUsage end,
+        maps:update_with(Key, Fun, 0, Acc)
+    end, StoppedUsage, running_jobs(Ets)),
+
+    % Clear the usage table first. This will remove usage stats from
+    % accounts which are no longer running or haven't just stopped
+    % running their jobs in the last scheduling cycle.
+    true = ets:delete_all_objects(?USAGE),
+
+    maps:fold(fun(Key, JobUsage, ok) ->
+        true = ets:insert(?USAGE, {Key, JobUsage})
+    end, ok, Usages).
+
+
+decay_all_priorities(Ets) ->
+    ok.
+
+
+adjust_running_priorities(Ets) ->
+    ok.
+
+
+running_jobs(Ets) ->
+    ets:select(Ets, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]).
+
+		    
+% Job usage is defined as number of milliseconds it was running since the last
+% time it started.
+get_job_usage(#job{} = Now, {_, _, _} = Now) ->
+    timer:now_diff(Now, last_started(Job)) div 1000.
+
+
+priority_coeff() ->
+    Default = ?DEFAULT_PRIORITY_COEFF,
+    StrVal = config:get("replicator", "priority_coeff", Default),
+    list_to_float(StrVal).
+
+
+usage_coeff() ->
+    Default = ?DEFAULT_USAGE_COEFF,
+    StrVal = config:get("replicator", "usage_coeff", Default),
+    list_to_float(StrVal).
+