You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/04/22 20:16:27 UTC

[couchdb] branch main updated (2e1fbc2 -> bdb3818)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    from 2e1fbc2  Switch show-test-results.py to use python 3
     new bf9b451  Fix fabric_fdb:next_vs/1 function
     new cb8e066  Add `COUCH_JOBS_RETRYABLE` macro to couch_jobs.hrl
     new bdb3818  Add AIMD-based batching to couch_jobs activity monitor and notifier

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch_jobs/src/couch_jobs.erl                  |  21 +--
 src/couch_jobs/src/couch_jobs.hrl                  |   6 +
 src/couch_jobs/src/couch_jobs_activity_monitor.erl |  85 ++++++++---
 src/couch_jobs/src/couch_jobs_fdb.erl              |  18 +--
 src/couch_jobs/src/couch_jobs_notifier.erl         | 160 +++++++++++++--------
 src/couch_jobs/src/couch_jobs_server.erl           |  16 ++-
 src/couch_jobs/src/couch_jobs_type_monitor.erl     |  16 +--
 src/couch_jobs/src/couch_jobs_util.erl             |  58 ++++++++
 src/couch_jobs/test/couch_jobs_tests.erl           |  75 ++++++++++
 src/fabric/src/fabric2_fdb.erl                     |  15 +-
 src/fabric/test/fabric2_changes_fold_tests.erl     |  26 ++++
 11 files changed, 375 insertions(+), 121 deletions(-)
 create mode 100644 src/couch_jobs/src/couch_jobs_util.erl

[couchdb] 03/03: Add AIMD-based batching to couch_jobs activity monitor and notifier

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit bdb38184e252dbd390bccb75d18db536d9240acd
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Apr 22 13:30:09 2021 -0400

    Add AIMD-based batching to couch_jobs activity monitor and notifier
    
    couch_jobs activity monitor is responsible for checking jobs which have not
    been updated often enough by their workers and re-enqueuing them. Previously,
    when the number of jobs grew high enough, couch_jobs could fail to either
    iterate through all the jobs and timeout with a 1007, or try to re-enqueue too
    many jobs such that the sum of the commit data would end up being larger than
    the 10MB FDB limit.
    
    couch_jobs notifier is in charge of notifying subscribers when job state
    changes. If the jobs are updated, it would notify if it noticed updates,
    otherwise it would notify if jobs switched to a new state (running -> pending,
    running -> finished, etc). Previously, if there were too many jobs and/or the
    cluster was overloaded, it was possible for the notifier to consistently fail
    with timeouts.
    
    To fix both issues introduce batching with the batch size dynamically adjusted
    based on load. When consecutive errors occur the batch size will shrink
    exponentially down to 1 row per transaction. Then, with each success, the batch
    will grow linearly by a fixed amount. This auto-configurable behavior should
    provide optimal behavior during overload and during normal operating
    conditions.
    
    For tests, since there are already tests which test enqueuing and subscription,
    use the same tests but make sure they are run while errors are periodically
    generated. That's accomplished with the help of `meck:loop/1` meck return
    specification.
---
 src/couch_jobs/src/couch_jobs.erl                  |   4 +-
 src/couch_jobs/src/couch_jobs_activity_monitor.erl |  82 ++++++++---
 src/couch_jobs/src/couch_jobs_fdb.erl              |  18 +--
 src/couch_jobs/src/couch_jobs_notifier.erl         | 160 +++++++++++++--------
 src/couch_jobs/src/couch_jobs_server.erl           |   8 +-
 src/couch_jobs/src/couch_jobs_util.erl             |  58 ++++++++
 src/couch_jobs/test/couch_jobs_tests.erl           |  75 ++++++++++
 7 files changed, 317 insertions(+), 88 deletions(-)

diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index ca7bc40..1229fca 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -121,9 +121,9 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
 -spec get_active_jobs_ids(jtx(), job_type()) -> [job_id()] | {error,
     any()}.
 get_active_jobs_ids(Tx, Type) ->
+    SinceVS = {versionstamp, 0, 0},
     couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
-        Since = couch_jobs_fdb:get_active_since(JTx, Type,
-            {versionstamp, 0, 0}),
+        {Since, _} = couch_jobs_fdb:get_active_since(JTx, Type, SinceVS, []),
         maps:keys(Since)
     end).
 
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
index 7281ef6..3ceece0 100644
--- a/src/couch_jobs/src/couch_jobs_activity_monitor.erl
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -37,11 +37,15 @@
     type,
     tref,
     timeout = 0,
-    vs = not_found
+    vs = not_found,
+    batch_size
 }).
 
 
--define(MAX_JITTER_DEFAULT, 10000).
+-define(MAX_JITTER_DEFAULT, "10000").
+-define(INIT_BATCH_SIZE, "1000").
+-define(BATCH_FACTOR, "0.75").
+-define(BATCH_INCREMENT, "100").
 -define(MISSING_TIMEOUT_CHECK, 5000).
 
 
@@ -52,7 +56,11 @@ start_link(Type) ->
 %% gen_server callbacks
 
 init([Type]) ->
-    St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+    St = #st{
+        jtx = couch_jobs_fdb:get_jtx(),
+        type = Type,
+        batch_size = init_batch_size()
+    },
     {ok, schedule_check(St)}.
 
 
@@ -98,19 +106,12 @@ code_change(_OldVsn, St, _Extra) ->
 % Private helper functions
 
 check_activity(#st{jtx = JTx, type = Type, vs = not_found} = St) ->
-    NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        couch_jobs_fdb:get_activity_vs(JTx1, Type)
-    end),
-    St#st{vs = NewVS};
-
-check_activity(#st{jtx = JTx, type = Type, vs = VS} = St) ->
-    NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        NewVS = couch_jobs_fdb:get_activity_vs(JTx1, Type),
-        JobIds = couch_jobs_fdb:get_inactive_since(JTx1, Type, VS),
-        couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, JobIds),
-        NewVS
-    end),
-    St#st{vs = NewVS}.
+    St#st{vs = get_activity_vs(JTx, Type)};
+
+check_activity(#st{} = St) ->
+    #st{jtx = JTx, type = Type, vs = VS, batch_size = BatchSize} = St,
+    NewBatchSize = re_enqueue_inactive(JTx, Type, VS, BatchSize),
+    St#st{vs = get_activity_vs(JTx, Type), batch_size = NewBatchSize}.
 
 
 get_timeout_msec(JTx, Type) ->
@@ -139,6 +140,53 @@ schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
     St1#st{tref = erlang:send_after(Wait, self(), check_activity)}.
 
 
+re_enqueue_inactive(JTx, Type, VS, BatchSize) ->
+    Result = try
+        couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+            Opts = [{limit, BatchSize}],
+            JobIds = couch_jobs_fdb:get_inactive_since(JTx1, Type, VS, Opts),
+            couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, JobIds),
+            length(JobIds)
+        end)
+    catch
+        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TOO_LARGE} ->
+            failed;
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+            failed
+    end,
+    case Result of
+        JobCnt when is_integer(JobCnt), JobCnt < BatchSize ->
+            BatchSize;
+        JobCnt when is_integer(JobCnt), JobCnt >= BatchSize ->
+            NewBatchSize = BatchSize + batch_increment(),
+            re_enqueue_inactive(JTx, Type, VS, NewBatchSize);
+        failed ->
+            NewBatchSize = max(1, round(BatchSize * batch_factor())),
+            re_enqueue_inactive(JTx, Type, VS, NewBatchSize)
+    end.
+
+
+get_activity_vs(JTx, Type) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
 get_max_jitter_msec()->
-    config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+    couch_jobs_util:get_non_neg_int(activity_monitor_max_jitter_msec,
         ?MAX_JITTER_DEFAULT).
+
+
+init_batch_size() ->
+    couch_jobs_util:get_non_neg_int(activity_monitor_init_batch_size,
+        ?INIT_BATCH_SIZE).
+
+
+batch_increment() ->
+    couch_jobs_util:get_non_neg_int(activity_monitor_batch_increment,
+        ?BATCH_INCREMENT).
+
+
+batch_factor() ->
+    couch_jobs_util:get_float_0_1(activity_monitor_batch_factor,
+        ?BATCH_FACTOR).
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
index 27131ec..cea1388 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -33,8 +33,8 @@
 
     get_activity_vs/2,
     get_activity_vs_and_watch/2,
-    get_active_since/3,
-    get_inactive_since/3,
+    get_active_since/4,
+    get_inactive_since/4,
     re_enqueue_inactive/3,
 
     init_cache/0,
@@ -356,26 +356,26 @@ get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
     end.
 
 
-get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp, Opts) ->
     #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
     Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
     StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
     StartKeySel = erlfdb_key:first_greater_or_equal(StartKey),
     {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
-    Opts = [{streaming_mode, want_all}],
     Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
-    maps:from_list(lists:map(fun({_K, V}) ->
-        erlfdb_tuple:unpack(V)
-    end, erlfdb:wait(Future))).
+    {JobIdsData, LastSeq} = lists:mapfoldl(fun({K, V}, _PrevSeq) ->
+        {Type, Seq} = erlfdb_tuple:unpack(K, Prefix),
+        {erlfdb_tuple:unpack(V), Seq}
+    end, Versionstamp, erlfdb:wait(Future)),
+    {maps:from_list(JobIdsData), LastSeq}.
 
 
-get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp, Opts) ->
     #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
     Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
     {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
     EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
     EndKeySel = erlfdb_key:first_greater_than(EndKey),
-    Opts = [{streaming_mode, want_all}],
     Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
     lists:map(fun({_K, V}) ->
         {JobId, _} = erlfdb_tuple:unpack(V),
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
index db5d5e0..37faf90 100644
--- a/src/couch_jobs/src/couch_jobs_notifier.erl
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -35,9 +35,11 @@
 -include("couch_jobs.hrl").
 
 
--define(TYPE_MONITOR_HOLDOFF_DEFAULT, 50).
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, "50").
 -define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity").
--define(GET_JOBS_RANGE_RATIO, 0.5).
+-define(INIT_BATCH_SIZE, "1000").
+-define(BATCH_FACTOR, "0.75").
+-define(BATCH_INCREMENT, "100").
 
 
 -record(st, {
@@ -46,7 +48,8 @@
     monitor_pid,
     subs, % #{JobId => #{Ref => {Pid, State, Seq}}}
     pidmap, % #{{Jobid, Pid} => Ref}
-    refmap % #{Ref => JobId}
+    refmap, % #{Ref => JobId}
+    batch_size
 }).
 
 
@@ -76,7 +79,8 @@ init([Type]) ->
         type = Type,
         subs = #{},
         pidmap = #{},
-        refmap = #{}
+        refmap = #{},
+        batch_size = init_batch_size()
     },
     VS = get_type_vs(St),
     HoldOff = get_holdoff(),
@@ -199,31 +203,44 @@ flush_type_updated_messages(VSMax) ->
     end.
 
 
-get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, Ratio)
-        when Ratio >= ?GET_JOBS_RANGE_RATIO ->
-    Filter = fun(JobId) -> maps:is_key(JobId, InactiveIdMap) end,
-    JobMap = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        couch_jobs_fdb:get_jobs(JTx1, Type, Filter)
-    end),
-    maps:map(fun(JobId, _) ->
-        case maps:is_key(JobId, JobMap) of
-            true -> maps:get(JobId, JobMap);
-            false -> {null, not_found, not_found}
-        end
-    end, InactiveIdMap);
-
-get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, _) ->
-    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        maps:map(fun(JobId, _) ->
-            Job = #{job => true, type => Type, id => JobId},
-            case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
-                {ok, Seq, State, Data} ->
-                    {Seq, State, Data};
-                {error, not_found} ->
-                    {null, not_found, not_found}
-            end
-        end, InactiveIdMap)
-    end).
+get_jobs(#st{} = St, Ids) when is_list(Ids) ->
+    #st{jtx = JTx, type = Type, batch_size = BatchSize} = St,
+    {Jobs, NewBatchSize} = get_jobs_iter(JTx, Type, Ids, BatchSize, #{}),
+    {Jobs, St#st{batch_size = NewBatchSize}}.
+
+
+get_jobs_iter(_Jtx, _Type, [], BatchSize, #{} = Acc) ->
+    {Acc, BatchSize};
+
+get_jobs_iter(JTx, Type, Ids, BatchSize, #{} = Acc0) ->
+    {BatchIds, RestIds} = case length(Ids) < BatchSize of
+        true -> {Ids, []};
+        false -> lists:split(BatchSize, Ids)
+    end,
+    Result = try
+        couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+            lists:foldl(fun(JobId, #{} = Acc) ->
+                Job = #{job => true, type => Type, id => JobId},
+                case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
+                    {ok, Seq, State, Data} ->
+                        Acc#{JobId => {Seq, State, Data}};
+                    {error, not_found} ->
+                        Acc#{JobId => {null, not_found, not_found}}
+                end
+            end, Acc0, BatchIds)
+        end)
+    catch
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+            failed
+    end,
+    case Result of
+        #{} = AccF ->
+            NewBatchSize = BatchSize + batch_increment(),
+            get_jobs_iter(JTx, Type, RestIds, NewBatchSize, AccF);
+        failed ->
+            NewBatchSize = max(1, round(BatchSize * batch_factor())),
+            get_jobs_iter(JTx, Type, Ids, NewBatchSize, Acc0)
+    end.
 
 
 get_type_vs(#st{jtx = JTx, type = Type}) ->
@@ -236,27 +253,47 @@ get_type_vs(#st{jtx = JTx, type = Type}) ->
 % and updated at least once since the given versionstamp. These are relatively
 % cheap to find as it's just a range read in the ?ACTIVITY subspace.
 %
-get_active_since(#st{} = _St, not_found) ->
-    #{};
-
-get_active_since(#st{jtx = JTx, type = Type, subs = Subs}, VS) ->
-    AllUpdated = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        couch_jobs_fdb:get_active_since(JTx1, Type, VS)
-    end),
-    maps:map(fun(_JobId, Data) ->
+get_active_since(#st{} = St, not_found) ->
+    {#{}, St};
+
+get_active_since(#st{} = St, VS) ->
+    #st{jtx = JTx, type = Type, subs = Subs, batch_size = BatchSize} = St,
+    {Updated, NewBatchSize} = get_active_iter(JTx, Type, VS, BatchSize, #{}),
+    UpdatedSubs = maps:map(fun(_JobId, Data) ->
         {VS, running, Data}
-    end, maps:with(maps:keys(Subs), AllUpdated)).
+    end, maps:with(maps:keys(Subs), Updated)),
+    {UpdatedSubs, St#st{batch_size = NewBatchSize}}.
+
+
+get_active_iter(JTx, Type, VS, BatchSize, #{} = Acc) ->
+    Opts = [{limit, BatchSize}],
+    Result = try
+        couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+            couch_jobs_fdb:get_active_since(JTx1, Type, VS, Opts)
+        end)
+    catch
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+            failed
+    end,
+    case Result of
+        {Updated, _FinalSeq} when map_size(Updated) < BatchSize ->
+            {maps:merge(Acc, Updated), BatchSize};
+        {Updated, FinalSeq} when map_size(Updated) >= BatchSize ->
+            Acc1 = maps:merge(Acc, Updated),
+            NewBatchSize = BatchSize + batch_increment(),
+            NextSeq = fabric2_fdb:next_vs(FinalSeq),
+            get_active_iter(JTx, Type, NextSeq, NewBatchSize, Acc1);
+        failed ->
+            NewBatchSize = max(1, round(BatchSize * batch_factor())),
+            get_active_iter(JTx, Type, VS, NewBatchSize, Acc)
+    end.
 
 
 try_notify_subscribers(ActiveVS, #st{} = St) ->
     try
         notify_subscribers(ActiveVS, St)
     catch
-        error:{timeout, _} ->
-            try_notify_subscribers(ActiveVS, St);
-        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TIMED_OUT} ->
-            try_notify_subscribers(ActiveVS, St);
-        error:{erlfdb_error, Code} when ?ERLFDB_IS_RETRYABLE(Code) ->
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
             try_notify_subscribers(ActiveVS, St)
     end.
 
@@ -267,14 +304,13 @@ notify_subscribers(_, #st{subs = Subs} = St) when map_size(Subs) =:= 0 ->
 notify_subscribers(ActiveVS, #st{} = St1) ->
     % First gather the easy (cheap) active jobs. Then with those out of way
     % inspect each job to get its state.
-    Active = get_active_since(St1, ActiveVS),
-    St2 = notify_job_ids(Active, St1),
+    {Active, St2} = get_active_since(St1, ActiveVS),
+    St3 = notify_job_ids(Active, St2),
     ActiveIds = maps:keys(Active),
-    Subs = St2#st.subs,
-    InactiveIdMap = maps:without(ActiveIds, Subs),
-    InactiveRatio = maps:size(InactiveIdMap) / maps:size(Subs),
-    Inactive = get_jobs(St2, InactiveIdMap, InactiveRatio),
-    notify_job_ids(Inactive, St2).
+    Subs = St3#st.subs,
+    InactiveIds = maps:keys(maps:without(ActiveIds, Subs)),
+    {Inactive, St4}  = get_jobs(St3, InactiveIds),
+    notify_job_ids(Inactive, St4).
 
 
 notify_job_ids(#{} = Jobs, #st{type = Type} = St0) ->
@@ -306,13 +342,25 @@ notify(Pid, Ref, Type, Id, State, Data) ->
 
 
 get_holdoff() ->
-    config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+    couch_jobs_util:get_non_neg_int(type_monitor_holdoff_msec,
         ?TYPE_MONITOR_HOLDOFF_DEFAULT).
 
 
 get_timeout() ->
-    Default =  ?TYPE_MONITOR_TIMEOUT_DEFAULT,
-    case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of
-        "infinity" -> infinity;
-        Milliseconds -> list_to_integer(Milliseconds)
-    end.
+    couch_jobs_util:get_timeout(type_monitor_timeout_msec,
+        ?TYPE_MONITOR_TIMEOUT_DEFAULT).
+
+
+init_batch_size() ->
+    couch_jobs_util:get_non_neg_int(notifier_init_batch_size,
+        ?INIT_BATCH_SIZE).
+
+
+batch_increment() ->
+    couch_jobs_util:get_non_neg_int(notifier_batch_increment,
+        ?BATCH_INCREMENT).
+
+
+batch_factor() ->
+    couch_jobs_util:get_float_0_1(notifier_batch_factor,
+        ?BATCH_FACTOR).
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
index 5b087ef..2a080e9 100644
--- a/src/couch_jobs/src/couch_jobs_server.erl
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -34,8 +34,8 @@
 ]).
 
 
--define(TYPE_CHECK_PERIOD_DEFAULT, 15000).
--define(MAX_JITTER_DEFAULT, 5000).
+-define(TYPE_CHECK_PERIOD_DEFAULT, "15000").
+-define(MAX_JITTER_DEFAULT, "5000").
 
 
 start_link() ->
@@ -188,10 +188,10 @@ schedule_check() ->
 
 
 get_period_msec() ->
-    config:get_integer("couch_jobs", "type_check_period_msec",
+    couch_jobs_util:get_non_neg_int(type_check_period_msec,
         ?TYPE_CHECK_PERIOD_DEFAULT).
 
 
 get_max_jitter_msec() ->
-    config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+    couch_jobs_util:get_non_neg_int(type_check_max_jitter_msec,
         ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_util.erl b/src/couch_jobs/src/couch_jobs_util.erl
new file mode 100644
index 0000000..747ab60
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_util.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_util).
+
+
+-export([
+    get_non_neg_int/2,
+    get_float_0_1/2,
+    get_timeout/2
+]).
+
+
+get_non_neg_int(Key, Default) when is_atom(Key), is_list(Default) ->
+    StrVal = config:get("couch_jobs", atom_to_list(Key), Default),
+    non_neg_int(Key, StrVal).
+
+
+get_float_0_1(Key, Default) when is_atom(Key), is_list(Default) ->
+    StrVal = config:get("couch_jobs", atom_to_list(Key), Default),
+    float_0_1(Key, StrVal).
+
+
+get_timeout(Key, Default) when is_atom(Key), is_list(Default) ->
+    case config:get("couch_jobs", atom_to_list(Key), Default) of
+        "infinity" -> infinity;
+        StrVal -> non_neg_int(Key, StrVal)
+    end.
+
+
+non_neg_int(Name, Str) ->
+    try
+        Val = list_to_integer(Str),
+        true = Val > 0,
+        Val
+    catch _:_ ->
+        erlang:error({invalid_non_neg_integer, {couch_jobs, Name, Str}})
+    end.
+
+
+float_0_1(Name, Str) ->
+    Val = try
+        list_to_float(Str)
+    catch error:badarg ->
+        erlang:error({invalid_float, {couch_jobs, Name, Str}})
+    end,
+    if Val >= 0.0 andalso Val =< 1.0 -> Val; true ->
+        erlang:error({float_out_of_range, {couch_jobs, Name, Str}})
+    end.
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index 3582c64..d006672 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -77,7 +77,34 @@ couch_jobs_basic_test_() ->
     }.
 
 
+couch_jobs_batching_test_() ->
+    {
+        "Test couch jobs batching logic",
+        {
+            setup,
+            fun setup_couch/0, fun teardown_couch/1,
+            {
+                foreach,
+                fun setup_batch/0, fun teardown_batch/1,
+                [
+                    ?TDEF_FE(accept_blocking),
+                    ?TDEF_FE(resubmit_enqueues_job),
+                    ?TDEF_FE(accept_max_schedtime),
+                    ?TDEF_FE(accept_no_schedule),
+                    ?TDEF_FE(subscribe),
+                    ?TDEF_FE(remove_when_subscribed_and_pending),
+                    ?TDEF_FE(remove_when_subscribed_and_running),
+                    ?TDEF_FE(subscribe_wait_multiple),
+                    ?TDEF_FE(enqueue_inactive, 15)
+                ]
+            }
+        }
+    }.
+
+
 setup_couch() ->
+    meck:new(couch_jobs_fdb, [passthrough]),
+    meck:new(couch_jobs_util, [passthrough]),
     % Because of a circular dependency between `couch_jobs` and `fabric` in
     % `fabric2_db_expiration` module, disable db expiration so when
     % `couch_jobs` is stopped the test, `fabric` app doesn't get torn down as
@@ -122,6 +149,54 @@ teardown(#{}) ->
     ok.
 
 
+setup_batch() ->
+    Ctx = setup(),
+
+    % Simulate having too many jobs to fit in a 10Mb
+    meck:expect(couch_jobs_fdb, re_enqueue_inactive, 3, meck:loop([
+        meck:raise(error, {erlfdb_error, 2101}),
+        meck:passthrough()
+    ])),
+
+    % Simulate get_inactive_since GRV timing out
+    meck:expect(couch_jobs_fdb, get_inactive_since, 4, meck:loop([
+        meck:raise(error, {erlfdb_error, 1007}),
+        meck:passthrough()
+    ])),
+
+    % Simulate get_active_since transaction timing out
+    meck:expect(couch_jobs_fdb, get_active_since, 4, meck:loop([
+        meck:raise(error, {erlfdb_error, 1031}),
+        meck:passthrough()
+    ])),
+
+    % Set up batching parameters to test small batches down to size 1
+    meck:expect(couch_jobs_util, get_non_neg_int, [
+        {[notifier_batch_increment, '_'], 1},
+        {[activity_monitor_batch_increment, '_'], 1},
+        {2, meck:passthrough()}
+    ]),
+    meck:expect(couch_jobs_util, get_float_0_1, [
+        {[notifier_batch_factor, '_'], 0.0001},
+        {[activity_monitor_batch_factor, '_'], 0.0001},
+        {2, meck:passthrough()}
+    ]),
+
+    Ctx.
+
+
+teardown_batch(Ctx) ->
+    teardown(Ctx),
+    meck:reset(couch_jobs_fdb),
+    meck:reset(couch_jobs_util),
+    meck:expect(couch_jobs_fdb, re_enqueue_inactive, 3, meck:passthrough()),
+    meck:expect(couch_jobs_fdb, get_active_since, 4, meck:passthrough()),
+    meck:expect(couch_jobs_fdb, get_inactive_since, 4, meck:passthrough()),
+    meck:expect(couch_jobs_util, get_non_neg_int, 2, meck:passthrough()),
+    meck:expect(couch_jobs_util, get_float_0_1, 2, meck:passthrough()),
+    ok.
+
+
 clear_jobs() ->
     couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
         #{jobs_path := Jobs, tx := Tx} = JTx,

[couchdb] 01/03: Fix fabric_fdb:next_vs/1 function

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit bf9b451689482ea3f046a4551300af73fbcba616
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Apr 22 13:05:48 2021 -0400

    Fix fabric_fdb:next_vs/1 function
    
    Also, add a clause for the variant without a txid part.
    
    Previously, `next_vs/1` could overflow the batch or the txid field. The range
    of values for both those is [0..16#FFFF], so the correct check before
    incrementing each field should be `< 16#FFFF` instead of `=< 16#FFFF`. Since
    we're dealing with bytes and in other places in the file we use 16#FFFF for max
    values in the versionstamp fields, switch to hex constants.
    
    The tests were included in the fabric2_changes_fold_tests module as next_vs is
    relevant for the _changes feed since_seq calculation.
---
 src/fabric/src/fabric2_fdb.erl                 | 15 ++++++++++++---
 src/fabric/test/fabric2_changes_fold_tests.erl | 26 ++++++++++++++++++++++++++
 2 files changed, 38 insertions(+), 3 deletions(-)

diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index bccf1d6..a4c3f89 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -1186,18 +1186,27 @@ seq_to_vs(Seq) when is_binary(Seq) ->
 
 
 next_vs({versionstamp, VS, Batch, TxId}) ->
-    {V, B, T} = case TxId =< 65535 of
+    {V, B, T} = case TxId < 16#FFFF of
         true ->
             {VS, Batch, TxId + 1};
         false ->
-            case Batch =< 65535 of
+            case Batch < 16#FFFF of
                 true ->
                     {VS, Batch + 1, 0};
                 false ->
                     {VS + 1, 0, 0}
             end
     end,
-    {versionstamp, V, B, T}.
+    {versionstamp, V, B, T};
+
+next_vs({versionstamp, VS, Batch}) ->
+    {V, B} = case Batch < 16#FFFF of
+        true ->
+            {VS, Batch + 1};
+        false ->
+            {VS + 1, 0}
+    end,
+    {versionstamp, V, B}.
 
 
 new_versionstamp(Tx) ->
diff --git a/src/fabric/test/fabric2_changes_fold_tests.erl b/src/fabric/test/fabric2_changes_fold_tests.erl
index fa79f25..a8578f9 100644
--- a/src/fabric/test/fabric2_changes_fold_tests.erl
+++ b/src/fabric/test/fabric2_changes_fold_tests.erl
@@ -22,6 +22,32 @@
 -define(DOC_COUNT, 25).
 
 
+next_vs_function_with_txid_test() ->
+    Cases = [
+        {{0, 0, 1}, {0, 0, 0}},
+        {{0, 0, 2}, {0, 0, 1}},
+        {{0, 1, 0}, {0, 0, 16#FFFF}},
+        {{0, 2, 0}, {0, 1, 16#FFFF}},
+        {{1, 0, 0}, {0, 16#FFFF, 16#FFFF}},
+        {{2, 0, 0}, {1, 16#FFFF, 16#FFFF}}
+    ],
+    Next = fun({V, B, T}) -> fabric2_fdb:next_vs({versionstamp, V, B, T}) end,
+    [?assertEqual({versionstamp, RV, RB, RT}, Next({V, B, T})) ||
+        {{RV, RB, RT}, {V, B, T}} <- Cases].
+
+
+next_vs_function_without_txid_test() ->
+    Cases = [
+        {{0, 1}, {0, 0}},
+        {{0, 2}, {0, 1}},
+        {{1, 0}, {0, 16#FFFF}},
+        {{2, 0}, {1, 16#FFFF}}
+    ],
+    Next = fun({V, B}) -> fabric2_fdb:next_vs({versionstamp, V, B}) end,
+    [?assertEqual({versionstamp, RV, RB}, Next({V, B})) ||
+        {{RV, RB}, {V, B}} <- Cases].
+
+
 changes_fold_test_() ->
     {
         "Test changes fold operations",

[couchdb] 02/03: Add `COUCH_JOBS_RETRYABLE` macro to couch_jobs.hrl

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit cb8e066e2e731a245a74c0b93a6b51500691dc0d
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Apr 22 13:17:19 2021 -0400

    Add `COUCH_JOBS_RETRYABLE` macro to couch_jobs.hrl
    
    This macro can be used to simplify retryable error checks throughout couch_jobs
    app. It checks for erlfdb retryable errors (1007, 1009, etc), for the 1031
    (`transaction_timed_out`) error and for `{timeout, _}`.
---
 src/couch_jobs/src/couch_jobs.erl                  | 17 +++++------------
 src/couch_jobs/src/couch_jobs.hrl                  |  6 ++++++
 src/couch_jobs/src/couch_jobs_activity_monitor.erl |  3 +--
 src/couch_jobs/src/couch_jobs_server.erl           |  8 ++++++--
 src/couch_jobs/src/couch_jobs_type_monitor.erl     | 16 ++--------------
 5 files changed, 20 insertions(+), 30 deletions(-)

diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index 8d2fcd8..ca7bc40 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -349,11 +349,7 @@ accept_loop(Type, NoSched, MaxSchedTime, Timeout) ->
     AcceptResult = try
         couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), TxFun)
     catch
-        error:{timeout, _} ->
-            retry;
-        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TIMED_OUT} ->
-            retry;
-        error:{erlfdb_error, Err} when ?ERLFDB_IS_RETRYABLE(Err) ->
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
             retry
     end,
     case AcceptResult of
@@ -389,15 +385,12 @@ wait_pending(PendingWatch, MaxSTime, UserTimeout, NoSched) ->
         erlfdb:wait(PendingWatch, [{timeout, Timeout}]),
         ok
     catch
-        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TIMED_OUT} ->
-            erlfdb:cancel(PendingWatch, [flush]),
-            retry;
-        error:{erlfdb_error, Error} when ?ERLFDB_IS_RETRYABLE(Error) ->
-            erlfdb:cancel(PendingWatch, [flush]),
-            retry;
         error:{timeout, _} ->
             erlfdb:cancel(PendingWatch, [flush]),
-            {error, not_found}
+            {error, not_found};
+        error:{Err, Tag} when ?COUCH_JOBS_RETRYABLE(Err, Tag) ->
+            erlfdb:cancel(PendingWatch, [flush]),
+            retry
     end.
 
 
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
index bb561b1..a160605 100644
--- a/src/couch_jobs/src/couch_jobs.hrl
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -40,6 +40,12 @@
 -define(COUCH_JOBS_CURRENT, '$couch_jobs_current').
 -define(UNDEFINED_MAX_SCHEDULED_TIME, 1 bsl 36).
 
+-define(COUCH_JOBS_RETRYABLE(Tag, Err), (
+    (Tag == timeout) orelse (
+        (Tag == erlfdb_error andalso ?ERLFDB_IS_RETRYABLE(Err)) orelse
+        (Tag == erlfdb_error andalso Err =:= ?ERLFDB_TRANSACTION_TIMED_OUT))
+)).
+
 
 -type jtx() :: map() | undefined | tuple().
 -type job_id() :: binary().
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
index d5dfa41..7281ef6 100644
--- a/src/couch_jobs/src/couch_jobs_activity_monitor.erl
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -72,8 +72,7 @@ handle_info(check_activity, St) ->
     St1 = try
         check_activity(St)
     catch
-        error:{erlfdb_error, Err} when ?ERLFDB_IS_RETRYABLE(Err) orelse
-                Err =:= ?ERLFDB_TRANSACTION_TIMED_OUT ->
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
             LogMsg = "~p : type:~p got ~p error, possibly from overload",
             couch_log:error(LogMsg, [?MODULE, St#st.type, Err]),
             St
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
index 2e03c7d..5b087ef 100644
--- a/src/couch_jobs/src/couch_jobs_server.erl
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -15,6 +15,9 @@
 -behaviour(gen_server).
 
 
+-include("couch_jobs.hrl").
+
+
 -export([
     start_link/0,
     get_notifier_server/1,
@@ -170,8 +173,9 @@ fdb_types() ->
             couch_jobs_fdb:get_types(JTx)
         end)
     catch
-        error:{timeout, _} ->
-            couch_log:warning("~p : Timed out connecting to FDB", [?MODULE]),
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+            LogMsg = "~p : Error ~p:~p connecting to FDB",
+            couch_log:warning(LogMsg, [?MODULE, Tag, Err]),
             []
     end.
 
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl
index b58f34e..95aee4e 100644
--- a/src/couch_jobs/src/couch_jobs_type_monitor.erl
+++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl
@@ -55,13 +55,7 @@ loop(#st{vs = VS, timeout = Timeout} = St) ->
     try
         erlfdb:wait(Watch, [{timeout, Timeout}])
     catch
-        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TIMED_OUT} ->
-            erlfdb:cancel(Watch, [flush]),
-            ok;
-        error:{erlfdb_error, Code} when ?ERLFDB_IS_RETRYABLE(Code) ->
-            erlfdb:cancel(Watch, [flush]),
-            ok;
-        error:{timeout, _} ->
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
             erlfdb:cancel(Watch, [flush]),
             ok
     end,
@@ -88,13 +82,7 @@ get_vs_and_watch(#st{} = St) ->
             couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
         end)
     catch
-        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TIMED_OUT} ->
-            timer:sleep(HoldOff),
-            get_vs_and_watch(St);
-        error:{erlfdb_error, Code} when ?ERLFDB_IS_RETRYABLE(Code) ->
-            timer:sleep(HoldOff),
-            get_vs_and_watch(St);
-        error:{timeout, _} ->
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
             timer:sleep(HoldOff),
             get_vs_and_watch(St)
     end.