You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/02/08 15:52:16 UTC
[couchdb] branch fair-share-scheduler-3.x updated: [wip] move to a
separate file
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch fair-share-scheduler-3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/fair-share-scheduler-3.x by this push:
new 702feba [wip] move to a separate file
702feba is described below
commit 702febac5dbb3317febff8daad330717615125d0
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Feb 8 10:50:06 2021 -0500
[wip] move to a separate file
---
.../src/couch_replicator_scheduler.erl | 153 ++------------------
.../src/couch_replicator_scheduler.hrl | 11 ++
.../src/couch_replicator_share.erl | 158 +++++++++++++++++++++
3 files changed, 179 insertions(+), 143 deletions(-)
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index bec8d7b..fccda17 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -77,11 +77,6 @@
-define(DEFAULT_MAX_HISTORY, 20).
-define(DEFAULT_SCHEDULER_INTERVAL, 60000).
--define(DEFAULT_PRIORITY_COEFF, "0.75").
--define(DEFAULT_SHARES, 100).
--define(SHARES, couch_replicator_scheduler_shares).
--define(USAGE, couch_replicator_scheduler_usage).
-
-record(state, {
interval,
@@ -89,32 +84,15 @@
max_jobs,
max_churn,
max_history,
- priority_coeff,
stats_pid
}).
--record(job, {
- id :: job_id() | '$1' | '_',
- rep :: #rep{} | '_',
- pid :: undefined | pid() | '$1' | '_',
- monitor :: undefined | reference() | '_',
- history :: history() | '_',
- usage_key :: undefined | binary() | '_',
- priority :: float()
-}).
-
-record(stats_acc, {
pending_n = 0 :: non_neg_integer(),
running_n = 0 :: non_neg_integer(),
crashed_n = 0 :: non_neg_integer()
}).
--record(usage, {
- key :: binary(),
- usage = 0 :: non_neg_integer(),
- num_jobs = 0 :: non_neg_integer(),
-}).
-
%% public functions
@@ -127,12 +105,14 @@ start_link() ->
add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
case existing_replication(Rep) of
false ->
+ UsageKey = couch_replicator_share:get_usage_key(Rep),
+ Priority = couch_replicator_share:new_job_priority(Rep),
Job = #job{
id = Rep#rep.id,
rep = Rep,
history = [{added, os:timestamp()}],
- usage_key = get_usage_key(Rep),
- priority = new_job_priority(Rep)
+ usage_key = UsageKey,
+ priority = Priority
},
gen_server:call(?MODULE, {add_job, Job}, infinity);
true ->
@@ -260,9 +240,6 @@ init(_) ->
EtsOpts = [named_table, {read_concurrency, true},
{write_concurrency, true}],
?MODULE = ets:new(?MODULE, EtsOpts ++ [{keypos, #job.id}]),
- ?SHARES = ets:new(?MODULE, EtsOpts),
- ?STOPPED_USAGE = ets:new(?STOPPED_USAGE, EtsOpts),
- ?USAGE = ?USAGE = ets:new(?USAGE, EtsOpts),
ok = config:listen_for_changes(?MODULE, nil),
Interval = config:get_integer("replicator", "interval",
?DEFAULT_SCHEDULER_INTERVAL),
@@ -271,16 +248,13 @@ init(_) ->
?DEFAULT_MAX_CHURN),
MaxHistory = config:get_integer("replicator", "max_history",
?DEFAULT_MAX_HISTORY),
- PriorityCoeff = list_to_float(config:get("replicator", "priority_coeff",
- ?DEFAULT_PRIORITY_COEFF)),
Timer = erlang:send_after(Interval, self(), reschedule),
- ok = init_shares(),
+ ok = couch_replicator_share:init(),
State = #state{
interval = Interval,
max_jobs = MaxJobs,
max_churn = MaxChurn,
max_history = MaxHistory,
- priority_coeff = PriorityCoeff,
timer = Timer,
stats_pid = start_stats_updater()
},
@@ -328,15 +302,10 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval),
couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
{noreply, State#state{interval = Interval}};
-handle_cast({set_priority_coeff, Coeff}, State) when is_float(Coeff),
- Coeff > 0 ->
- couch_log:notice("~p: priority_coeff set to ~B", [?MODULE, Coeff]),
- {noreply, State#state{priority_coeff = Coeff}};
-
handle_cast({update_shares, Key, Shares}, State) when is_binary(Db),
is_integer(Shares), Shares >= 0 ->
couch_log:notice("~ shares for ~S set to ~B", [?MODULE, Key, Shares]),
- ok = update_shares(Key, Shares),
+ ok = couch_replicator_share:update_shares(Key, Shares),
{noreply, State};
@@ -364,7 +333,7 @@ handle_info(reschedule, State) ->
handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
{ok, Job} = job_by_pid(Pid),
couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
- ok = add_stopped_job_usage(Job, os:timestamp()),
+ ok = couch_replicator_share:add_stopped_job_usage(Job, os:timestamp()),
remove_job_int(Job),
update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
@@ -375,7 +344,7 @@ handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
{shutdown, ShutdownReason} -> ShutdownReason;
Other -> Other
end,
- ok = add_stopped_job_usage(Job, os:timestamp()),
+ ok = couch_replicator_share:add_stopped_job_usage(Job, os:timestamp()),
ok = handle_crashed_job(Job, Reason, State),
{noreply, State};
@@ -421,10 +390,6 @@ handle_config_change("replicator", "max_history", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
{ok, S};
-handle_config_change("replicator", "priority_coeff", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_priority_coeff, list_to_float(V)}),
- {ok, S};
-
handle_config_change("replicator.shares", Key, V, _, S) ->
ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key),
list_to_integer(V)}),
@@ -661,7 +626,8 @@ add_job_int(#job{} = Job) ->
maybe_remove_job_int(JobId, State) ->
case job_by_id(JobId) of
{ok, Job} ->
- ok = add_stopped_job_usage(Job, os:timestamp()),
+ Now = os:timestamp(),
+ ok = couch_replicator_share:add_stopped_job_usage(Job, Now),
ok = stop_job_int(Job, State),
true = remove_job_int(Job),
couch_stats:increment_counter([couch_replicator, jobs, removes]),
@@ -1033,105 +999,6 @@ existing_replication(#rep{} = NewRep) ->
end.
-init_shares() ->
- lists:foreach(fun({K, V}) ->
- ok = update_shares(list_to_binary(K), list_to_integer(V))
- end, config:get("replicator.shares").
-
-
-update_shares(Key, Shares) when is_binary(Db), is_integer(Shares) ->
- true = ets:insert(?SHARES, {Key, Shares}),
- ok.
-
-
-get_usage_key(#rep{} = Rep) ->
- case is_binary(Rep#rep.db_name) of
- true -> Rep#rep.db_name;
- false -> (Rep#rep.user_ctx)#user_ctx.name
- end.
-
-
-% Job usage is defined as number of milliseconds it was running since the last
-% time it started.
-get_job_usage(#job{} = Now, {_, _, _} = Now) ->
- timer:now_diff(Now, last_started(Job)) div 1000.
-
-
-new_job_priority(#rep{} = Rep) ->
- 0.0. % get average priority here so new jobs don't monopolize the scheduler
-
-
-get_usage(Db) when is_binary(Db) ->
- 0.
-
-
-get_shares(Db) when is_binary(Db) ->
- 0.
-
-
-get_db_num_jobs(Db) when is_binary(Db) ->
- 0.
-
-
-% Accumulate usage from jobs which stop in the middle of the sheduling cycle.
-% The accumulated value is then summed up with all the running job usage at the
-% start of each scheduling cycle.
-%
-add_stopped_job_usage(#job{pid = undefined}, {_, _, _} = _Now) ->
- % Only accumulate usage from jobs which were running. If a stopped jobs is
- % removed we ignore it here.
- ok;
-
-add_stopped_job_usage(#job{} = Job, {_, _, _} = Now) ->
- Key = Job#job.usage_key,
- Usage = get_job_usage(Job, Now),
- ets:update_counter(?STOPPED_USAGE, Key, Usage, {Key, 0}),
- ok.
-
-update_usage({_, _, } = Now) ->
- StoppedUsage = ets:foldl(fun({Key, Val}, Acc) ->
- Acc#{Key := Val}
- end, #{}, ?STOPPED_USAGE),
-
- % Next cycle start with a clean stopped usage table
- true = ets:delete_all_objects(?STOPPED_USAGE),
-
- % Get usage from all the running jobs and per-db job counts. Running job
- % usage is added to the already computed stopped job usage.
- {Usage, NumJobs} = ets:foldl(fun(Job, {UsgAcc, CntAcc}) ->
- Key = Job#job.usage_key,
- CntFun = fun(Val) -> Val + 1 end,
- CntAcc1 = maps:update_with(Key, CntFun, 0, CntAcc),
- UsgAcc1 = case is_pid(Job#job.pid) of
- true ->
- UsageInc = get_job_usage(Job, Now),
- UsgFun = fun(Val) -> Val + UsageInc end,
- maps:update_with(Key, UsgFun, 0, UsgAcc);
- false ->
- UsgAcc
- end,
- {CntAcc1, UsgAcc1}
- end, {StoppedUsage, #{}}, ?MODULE),
-
- % Clear the table first. This will usage stats from accounts which are not
- % longer running or haven't just stopped running their jobs in the last
- % scheduling cycle.
- true = ets:delete_all_objects(?USAGE),
-
- maps:fold(fun(Key, Usage, ok) ->
- JobCount = maps:get(Key, NumJobs, 0)
- true = ets:insert(?USAGE, {Key, Usage, maps:get(Key, NumJobs, 0)})
- end, ok, Usage).
-
-
-decay_priorities(PriorityCoeff) ->
- ok.
-
-
-adjust_priorities() ->
- ok.
-
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.hrl b/src/couch_replicator/src/couch_replicator_scheduler.hrl
index 5203b0c..6b88279 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.hrl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.hrl
@@ -13,3 +13,14 @@
-type job_id() :: term().
-type job_args() :: term().
+
+
+-record(job, {
+ id :: job_id() | '$1' | '_',
+ rep :: #rep{} | '_',
+ pid :: undefined | pid() | '$1' | '_',
+ monitor :: undefined | reference() | '_',
+ history :: history() | '_',
+ usage_key :: undefined | binary() | '_',
+ priority :: float()
+}).
diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl
new file mode 100644
index 0000000..08189c5
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_share.erl
@@ -0,0 +1,158 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_share).
+
+-export([
+ init/0,
+
+ update_shares/2,
+
+ job_added/1,
+ job_removed/1,
+
+ get_usage_key/1,
+ new_job_priority/1,
+ add_stopped_job_usage/2,
+
+ update_usage/1,
+ decay_all_priorities/1,
+ adjust_running_priorities/1
+]).
+
+
+-include("couch_replicator.hrl").
+-include("couch_replicator_scheduler.hrl").
+
+
+-define(DEFAULT_USAGE_COEFF, "0.2").
+-define(DEFAULT_PRIORITY_COEFF, "0.8").
+-define(DEFAULT_SHARES, 100).
+-define(SHARES, couch_replicator_shares).
+-define(USAGE, couch_replicator_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+ EtsOpts = [named_table],
+ ?SHARES = ets:new(?MODULE, EtsOpts),
+ ?STOPPED_USAGE = ets:new(?STOPPED_USAGE, EtsOpts),
+ ?USAGE = ets:new(?USAGE, EtsOpts),
+ ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts),
+ lists:foreach(fun({K, V}) ->
+ ok = update_shares(list_to_binary(K), list_to_integer(V))
+ end, config:get("replicator.shares").
+
+
+update_shares(Key, Shares) when is_binary(Db), is_integer(Shares) ->
+ true = ets:insert(?SHARES, {Key, Shares}),
+ ok.
+
+
+job_added(#job{} = Job) ->
+ Key = Job#job.usage_key,
+ ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}).
+
+
+job_removed(#job{} = Job) ->
+ Key = Job#job.usage_key,
+ case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+ N when is_integer(N), N =< 0 ->
+ true = ets:delete(?NUM_JOBS, Key);
+ N when is_integer(N), N > 0 ->
+ ok
+ end.
+
+
+get_usage_key(#rep{} = Rep) ->
+ case is_binary(Rep#rep.db_name) of
+ true -> Rep#rep.db_name;
+ false -> (Rep#rep.user_ctx)#user_ctx.name
+ end.
+
+
+new_job_priority(#rep{} = Rep) ->
+ 0.0. % get average priority here so new jobs don't monopolize the scheduler
+
+
+% Accumulate usage from jobs which stop in the middle of the sheduling
+% cycle. The accumulated value is then summed up with all the running
+% jobs usage at the start of each scheduling cycle.
+%
+add_stopped_job_usage(#job{pid = undefined}, {_, _, _} = _Now) ->
+ % Only accumulate usage from jobs which were running. If a stopped jobs is
+ % removed it's ignored.
+ ok;
+
+add_stopped_job_usage(#job{} = Job, {_, _, _} = Now) ->
+ Key = Job#job.usage_key,
+ Usage = get_job_usage(Job, Now),
+ ets:update_counter(?STOPPED_USAGE, Key, Usage, {Key, 0}),
+ ok.
+
+
+update_usage(Ets, {_, _, } = Now) ->
+ StoppedUsage = ets:foldl(fun({Key, Val}, Acc) ->
+ Acc#{Key := Val}
+ end, #{}, ?STOPPED_USAGE),
+
+ % Next cycle start with a clean stopped usage table
+ true = ets:delete_all_objects(?STOPPED_USAGE),
+
+ % Get usage from all the running jobs. Running job
+ % usage is added to the already computed stopped job usage.
+ Usage = lists:foldl(fun(Job, Acc) ->
+ Key = Job#job.usage_key,
+ JobUsage = get_job_usage(Job, Now),
+ Fun = fun(Val) -> Val + JobUsage end,
+ maps:update_with(Key, Fun, 0, Acc)
+ end, StoppedUsage, running_jobs(Ets)),
+
+ % Clear the usage table first. This will remove usage stats from
+ % accounts which are no longer running or haven't just stopped
+ % running their jobs in the last scheduling cycle.
+ true = ets:delete_all_objects(?USAGE),
+
+ maps:fold(fun(Key, JobUsage, ok) ->
+ true = ets:insert(?USAGE, {Key, JobUsage})
+ end, ok, Usages).
+
+
+decay_all_priorities(Ets) ->
+ ok.
+
+
+adjust_running_priorities(Ets) ->
+ ok.
+
+
+running_jobs(Ets) ->
+ ets:select(Ets, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]).
+
+
+% Job usage is defined as number of milliseconds it was running since the last
+% time it started.
+get_job_usage(#job{} = Now, {_, _, _} = Now) ->
+ timer:now_diff(Now, last_started(Job)) div 1000.
+
+
+priority_coeff() ->
+ Default = ?DEFAULT_PRIORITY_COEFF,
+ StrVal = config:get("replicator", "priority_coeff", Default),
+ list_to_float(StrVal).
+
+
+usage_coeff() ->
+ Default = ?DEFAULT_USAGE_COEFF,
+ StrVal = config:get("replicator", "usage_coeff", Default),
+ list_to_float(StrVal).
+