You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@couchdb.apache.org by GitBox <gi...@apache.org> on 2021/03/01 20:46:08 UTC

[GitHub] [couchdb] nickva commented on a change in pull request #3364: Fair Share Replication Scheduler Implementation (3.x)

nickva commented on a change in pull request #3364:
URL: https://github.com/apache/couchdb/pull/3364#discussion_r585037305



##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares")).
+
+
+clear() ->
+    Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+    lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+    ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+    ets:delete(?SHARES, Key).
+
+
+job_added(#job{} = Job) ->
+    Key = key(Job),
+    ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
+    % Update job's priority as if it ran during one scheduler cycle. This is so
+    % new jobs don't get to be at priority 0 (highest).
+    update_priority(Job).
+
+
+job_removed(#job{} = Job) ->
+    Key = key(Job),
+    ets:delete(?PRIORITIES, Job#job.id),
+    case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+        N when is_integer(N), N =< 0 ->
+            ets:delete(?NUM_JOBS, Key);
+        N when is_integer(N), N > 0 ->
+            ok
+    end,
+    ok.
+
+
+priority(JobId) ->
+    % Not found means it was removed because it's value was 0
+    case ets:lookup(?PRIORITIES, JobId) of
+        [{_, Priority}] -> Priority;
+        [] -> 0
+    end.
+
+
+usage(Key) ->
+    case ets:lookup(?USAGE, Key) of
+        [{_, Usage}] -> Usage;
+        [] -> 0
+    end.
+
+
+num_jobs(Key) ->
+    case ets:lookup(?NUM_JOBS, Key) of
+        [{_, NumJobs}] -> NumJobs;
+        [] -> 0
+    end.
+
+
+shares(Key) ->
+    case ets:lookup(?SHARES, Key) of
+        [{_, Shares}] -> Shares;
+        [] -> ?DEFAULT_SHARES
+    end.
+
+
+charge(#job{pid = undefined}, _, _) ->
+    0;
+
+charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
+    Key = key(Job),
+    Charges = job_charges(Job, Interval, Now),
+    ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
+
+
+% In [1] this described in the "Decay of Process Priorities" section
+%
+decay_priorities() ->
+    decay(?PRIORITIES, priority_coeff()),
+    % If priority becomes 0, it's removed. When looking it up, if it
+    % is missing we assume it is 0
+    clear_zero(?PRIORITIES).
+
+
+% This is the main part of the alrgorithm. In [1] it is described in the
+% "Priority Adjustment" section.
+%
+update_priority(#job{} = Job) ->
+    Id = Job#job.id,
+    Key = key(Job),
+    Shares = shares(Key),
+    Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
+    ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
+
+
+% This is the "User-Level Scheduling" part from [1]
+%
+update_usage() ->
+    decay(?USAGE, usage_coeff()),
+    clear_zero(?USAGE),
+    ets:foldl(fun({Key, Charges}, _) ->
+        ets:update_counter(?USAGE, Key, Charges, {Key, 0})
+    end, 0, ?CHARGES),
+    % Start each interval with a fresh charges table
+    ets:delete_all_objects(?CHARGES).
+
+
+% Private helper functions
+
+decay(Ets, Coeff) when is_atom(Ets) ->
+    Head = {'$1', '$2'},
+    Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},
+    ets:select_replace(Ets, [{Head, [], [Result]}]).
+
+
+clear_zero(Ets) when is_atom(Ets) ->
+    ets:select_delete(Ets, [{{'_', 0}, [], [true]}]).

Review comment:
       Excellent point about it changing in the future. Agree. Will update it 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org