You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/04/22 17:48:59 UTC

[couchdb] 03/03: Add AIMD-based batching to couch_jobs activity monitor and notifier

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch add-batching-to-couch-jobs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 331f45fec1d7b7c71304d2cb632bae2343793b7b
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Apr 22 13:30:09 2021 -0400

    Add AIMD-based batching to couch_jobs activity monitor and notifier
    
    couch_jobs activity monitor is responsible for checking jobs which have not
    been updated often enough by their workers and re-enqueing them. Previously
    when the number of jobs grew high enough, couch_jobs could fail to either
    iterate through all the jobs and timeout with a 1007, try to re-enuqueue too
    many jobs such that the sum of the commit data would end up being larger than
    the 10MB FDB limit.
    
    couch_jobs notifier is in charge notifying subscribers when job state changes.
    If the jobs are updated, it would notify if it noticed updates, otherwise it
    would notify if job switch to a new state, update -> pending, update ->
    finished, etc. Previously if there were too many jobs and/or the cluster was
    overloaded it was possible for the notifier to fail with timeouts.
    
    To fix both of the issue, introduce batching with the batch size dynamically
    adjusted based on load. As more consecutive errors occur the batch will shrink
    exponentially down to 1 row per transaction. Then with each success, the batch
    will grow linearly by a fixed amount. This auto-configurable behavior should
    provide optimal behavior during overload and during normal operating
    conditions.
    
    For tests, since there are already tests which test enqueuing and subscription,
    use the same tests but make sure they are run while errors are periodically
    generated. That's accomplished with the help of `meck:loop/1` return
    specification.
---
 src/couch_jobs/src/couch_jobs.erl                  |   4 +-
 src/couch_jobs/src/couch_jobs_activity_monitor.erl |  82 ++++++++---
 src/couch_jobs/src/couch_jobs_fdb.erl              |  18 +--
 src/couch_jobs/src/couch_jobs_notifier.erl         | 160 +++++++++++++--------
 src/couch_jobs/src/couch_jobs_server.erl           |   8 +-
 src/couch_jobs/src/couch_jobs_util.erl             |  59 ++++++++
 src/couch_jobs/test/couch_jobs_tests.erl           |  75 ++++++++++
 7 files changed, 318 insertions(+), 88 deletions(-)

diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index ca7bc40..1229fca 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -121,9 +121,9 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
 -spec get_active_jobs_ids(jtx(), job_type()) -> [job_id()] | {error,
     any()}.
 get_active_jobs_ids(Tx, Type) ->
+    SinceVS = {versionstamp, 0, 0},
     couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
-        Since = couch_jobs_fdb:get_active_since(JTx, Type,
-            {versionstamp, 0, 0}),
+        {Since, _} = couch_jobs_fdb:get_active_since(JTx, Type, SinceVS, []),
         maps:keys(Since)
     end).
 
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
index 7281ef6..3ceece0 100644
--- a/src/couch_jobs/src/couch_jobs_activity_monitor.erl
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -37,11 +37,15 @@
     type,
     tref,
     timeout = 0,
-    vs = not_found
+    vs = not_found,
+    batch_size
 }).
 
 
--define(MAX_JITTER_DEFAULT, 10000).
+-define(MAX_JITTER_DEFAULT, "10000").
+-define(INIT_BATCH_SIZE, "1000").
+-define(BATCH_FACTOR, "0.75").
+-define(BATCH_INCREMENT, "100").
 -define(MISSING_TIMEOUT_CHECK, 5000).
 
 
@@ -52,7 +56,11 @@ start_link(Type) ->
 %% gen_server callbacks
 
 init([Type]) ->
-    St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+    St = #st{
+        jtx = couch_jobs_fdb:get_jtx(),
+        type = Type,
+        batch_size = init_batch_size()
+    },
     {ok, schedule_check(St)}.
 
 
@@ -98,19 +106,12 @@ code_change(_OldVsn, St, _Extra) ->
 % Private helper functions
 
 check_activity(#st{jtx = JTx, type = Type, vs = not_found} = St) ->
-    NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        couch_jobs_fdb:get_activity_vs(JTx1, Type)
-    end),
-    St#st{vs = NewVS};
-
-check_activity(#st{jtx = JTx, type = Type, vs = VS} = St) ->
-    NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        NewVS = couch_jobs_fdb:get_activity_vs(JTx1, Type),
-        JobIds = couch_jobs_fdb:get_inactive_since(JTx1, Type, VS),
-        couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, JobIds),
-        NewVS
-    end),
-    St#st{vs = NewVS}.
+    St#st{vs = get_activity_vs(JTx, Type)};
+
+check_activity(#st{} = St) ->
+    #st{jtx = JTx, type = Type, vs = VS, batch_size = BatchSize} = St,
+    NewBatchSize = re_enqueue_inactive(JTx, Type, VS, BatchSize),
+    St#st{vs = get_activity_vs(JTx, Type), batch_size = NewBatchSize}.
 
 
 get_timeout_msec(JTx, Type) ->
@@ -139,6 +140,53 @@ schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
     St1#st{tref = erlang:send_after(Wait, self(), check_activity)}.
 
 
+re_enqueue_inactive(JTx, Type, VS, BatchSize) ->
+    Result = try
+        couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+            Opts = [{limit, BatchSize}],
+            JobIds = couch_jobs_fdb:get_inactive_since(JTx1, Type, VS, Opts),
+            couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, JobIds),
+            length(JobIds)
+        end)
+    catch
+        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TOO_LARGE} ->
+            failed;
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+            failed
+    end,
+    case Result of
+        JobCnt when is_integer(JobCnt), JobCnt < BatchSize ->
+            BatchSize;
+        JobCnt when is_integer(JobCnt), JobCnt >= BatchSize ->
+            NewBatchSize = BatchSize + batch_increment(),
+            re_enqueue_inactive(JTx, Type, VS, NewBatchSize);
+        failed ->
+            NewBatchSize = max(1, round(BatchSize * batch_factor())),
+            re_enqueue_inactive(JTx, Type, VS, NewBatchSize)
+    end.
+
+
+get_activity_vs(JTx, Type) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
 get_max_jitter_msec()->
-    config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+    couch_jobs_util:get_non_neg_int(activity_monitor_max_jitter_msec,
         ?MAX_JITTER_DEFAULT).
+
+
+init_batch_size() ->
+    couch_jobs_util:get_non_neg_int(activity_monitor_init_batch_size,
+        ?INIT_BATCH_SIZE).
+
+
+batch_increment() ->
+    couch_jobs_util:get_non_neg_int(activity_monitor_batch_increment,
+        ?BATCH_INCREMENT).
+
+
+batch_factor() ->
+    couch_jobs_util:get_float_0_1(activity_monitor_batch_factor,
+        ?BATCH_FACTOR).
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
index 27131ec..cea1388 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -33,8 +33,8 @@
 
     get_activity_vs/2,
     get_activity_vs_and_watch/2,
-    get_active_since/3,
-    get_inactive_since/3,
+    get_active_since/4,
+    get_inactive_since/4,
     re_enqueue_inactive/3,
 
     init_cache/0,
@@ -356,26 +356,26 @@ get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
     end.
 
 
-get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp, Opts) ->
     #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
     Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
     StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
     StartKeySel = erlfdb_key:first_greater_or_equal(StartKey),
     {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
-    Opts = [{streaming_mode, want_all}],
     Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
-    maps:from_list(lists:map(fun({_K, V}) ->
-        erlfdb_tuple:unpack(V)
-    end, erlfdb:wait(Future))).
+    {JobIdsData, LastSeq} = lists:mapfoldl(fun({K, V}, _PrevSeq) ->
+        {Type, Seq} = erlfdb_tuple:unpack(K, Prefix),
+        {erlfdb_tuple:unpack(V), Seq}
+    end, Versionstamp, erlfdb:wait(Future)),
+    {maps:from_list(JobIdsData), LastSeq}.
 
 
-get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp, Opts) ->
     #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
     Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
     {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
     EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
     EndKeySel = erlfdb_key:first_greater_than(EndKey),
-    Opts = [{streaming_mode, want_all}],
     Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
     lists:map(fun({_K, V}) ->
         {JobId, _} = erlfdb_tuple:unpack(V),
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
index db5d5e0..37faf90 100644
--- a/src/couch_jobs/src/couch_jobs_notifier.erl
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -35,9 +35,11 @@
 -include("couch_jobs.hrl").
 
 
--define(TYPE_MONITOR_HOLDOFF_DEFAULT, 50).
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, "50").
 -define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity").
--define(GET_JOBS_RANGE_RATIO, 0.5).
+-define(INIT_BATCH_SIZE, "1000").
+-define(BATCH_FACTOR, "0.75").
+-define(BATCH_INCREMENT, "100").
 
 
 -record(st, {
@@ -46,7 +48,8 @@
     monitor_pid,
     subs, % #{JobId => #{Ref => {Pid, State, Seq}}}
     pidmap, % #{{Jobid, Pid} => Ref}
-    refmap % #{Ref => JobId}
+    refmap, % #{Ref => JobId}
+    batch_size
 }).
 
 
@@ -76,7 +79,8 @@ init([Type]) ->
         type = Type,
         subs = #{},
         pidmap = #{},
-        refmap = #{}
+        refmap = #{},
+        batch_size = init_batch_size()
     },
     VS = get_type_vs(St),
     HoldOff = get_holdoff(),
@@ -199,31 +203,44 @@ flush_type_updated_messages(VSMax) ->
     end.
 
 
-get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, Ratio)
-        when Ratio >= ?GET_JOBS_RANGE_RATIO ->
-    Filter = fun(JobId) -> maps:is_key(JobId, InactiveIdMap) end,
-    JobMap = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        couch_jobs_fdb:get_jobs(JTx1, Type, Filter)
-    end),
-    maps:map(fun(JobId, _) ->
-        case maps:is_key(JobId, JobMap) of
-            true -> maps:get(JobId, JobMap);
-            false -> {null, not_found, not_found}
-        end
-    end, InactiveIdMap);
-
-get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, _) ->
-    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        maps:map(fun(JobId, _) ->
-            Job = #{job => true, type => Type, id => JobId},
-            case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
-                {ok, Seq, State, Data} ->
-                    {Seq, State, Data};
-                {error, not_found} ->
-                    {null, not_found, not_found}
-            end
-        end, InactiveIdMap)
-    end).
+get_jobs(#st{} = St, Ids) when is_list(Ids) ->
+    #st{jtx = JTx, type = Type, batch_size = BatchSize} = St,
+    {Jobs, NewBatchSize} = get_jobs_iter(JTx, Type, Ids, BatchSize, #{}),
+    {Jobs, St#st{batch_size = NewBatchSize}}.
+
+
+get_jobs_iter(_Jtx, _Type, [], BatchSize, #{} = Acc) ->
+    {Acc, BatchSize};
+
+get_jobs_iter(JTx, Type, Ids, BatchSize, #{} = Acc0) ->
+    {BatchIds, RestIds} = case length(Ids) < BatchSize of
+        true -> {Ids, []};
+        false -> lists:split(BatchSize, Ids)
+    end,
+    Result = try
+        couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+            lists:foldl(fun(JobId, #{} = Acc) ->
+                Job = #{job => true, type => Type, id => JobId},
+                case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
+                    {ok, Seq, State, Data} ->
+                        Acc#{JobId => {Seq, State, Data}};
+                    {error, not_found} ->
+                        Acc#{JobId => {null, not_found, not_found}}
+                end
+            end, Acc0, BatchIds)
+        end)
+    catch
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+            failed
+    end,
+    case Result of
+        #{} = AccF ->
+            NewBatchSize = BatchSize + batch_increment(),
+            get_jobs_iter(JTx, Type, RestIds, NewBatchSize, AccF);
+        failed ->
+            NewBatchSize = max(1, round(BatchSize * batch_factor())),
+            get_jobs_iter(JTx, Type, Ids, NewBatchSize, Acc0)
+    end.
 
 
 get_type_vs(#st{jtx = JTx, type = Type}) ->
@@ -236,27 +253,47 @@ get_type_vs(#st{jtx = JTx, type = Type}) ->
 % and updated at least once since the given versionstamp. These are relatively
 % cheap to find as it's just a range read in the ?ACTIVITY subspace.
 %
-get_active_since(#st{} = _St, not_found) ->
-    #{};
-
-get_active_since(#st{jtx = JTx, type = Type, subs = Subs}, VS) ->
-    AllUpdated = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
-        couch_jobs_fdb:get_active_since(JTx1, Type, VS)
-    end),
-    maps:map(fun(_JobId, Data) ->
+get_active_since(#st{} = St, not_found) ->
+    {#{}, St};
+
+get_active_since(#st{} = St, VS) ->
+    #st{jtx = JTx, type = Type, subs = Subs, batch_size = BatchSize} = St,
+    {Updated, NewBatchSize} = get_active_iter(JTx, Type, VS, BatchSize, #{}),
+    UpdatedSubs = maps:map(fun(_JobId, Data) ->
         {VS, running, Data}
-    end, maps:with(maps:keys(Subs), AllUpdated)).
+    end, maps:with(maps:keys(Subs), Updated)),
+    {UpdatedSubs, St#st{batch_size = NewBatchSize}}.
+
+
+get_active_iter(JTx, Type, VS, BatchSize, #{} = Acc) ->
+    Opts = [{limit, BatchSize}],
+    Result = try
+        couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+            couch_jobs_fdb:get_active_since(JTx1, Type, VS, Opts)
+        end)
+    catch
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+            failed
+    end,
+    case Result of
+        {Updated, _FinalSeq} when map_size(Updated) < BatchSize ->
+            {maps:merge(Acc, Updated), BatchSize};
+        {Updated, FinalSeq} when map_size(Updated) >= BatchSize ->
+            Acc1 = maps:merge(Acc, Updated),
+            NewBatchSize = BatchSize + batch_increment(),
+            NextSeq = fabric2_fdb:next_vs(FinalSeq),
+            get_active_iter(JTx, Type, NextSeq, NewBatchSize, Acc1);
+        failed ->
+            NewBatchSize = max(1, round(BatchSize * batch_factor())),
+            get_active_iter(JTx, Type, VS, NewBatchSize, Acc)
+    end.
 
 
 try_notify_subscribers(ActiveVS, #st{} = St) ->
     try
         notify_subscribers(ActiveVS, St)
     catch
-        error:{timeout, _} ->
-            try_notify_subscribers(ActiveVS, St);
-        error:{erlfdb_error, ?ERLFDB_TRANSACTION_TIMED_OUT} ->
-            try_notify_subscribers(ActiveVS, St);
-        error:{erlfdb_error, Code} when ?ERLFDB_IS_RETRYABLE(Code) ->
+        error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
             try_notify_subscribers(ActiveVS, St)
     end.
 
@@ -267,14 +304,13 @@ notify_subscribers(_, #st{subs = Subs} = St) when map_size(Subs) =:= 0 ->
 notify_subscribers(ActiveVS, #st{} = St1) ->
     % First gather the easy (cheap) active jobs. Then with those out of way
     % inspect each job to get its state.
-    Active = get_active_since(St1, ActiveVS),
-    St2 = notify_job_ids(Active, St1),
+    {Active, St2} = get_active_since(St1, ActiveVS),
+    St3 = notify_job_ids(Active, St2),
     ActiveIds = maps:keys(Active),
-    Subs = St2#st.subs,
-    InactiveIdMap = maps:without(ActiveIds, Subs),
-    InactiveRatio = maps:size(InactiveIdMap) / maps:size(Subs),
-    Inactive = get_jobs(St2, InactiveIdMap, InactiveRatio),
-    notify_job_ids(Inactive, St2).
+    Subs = St3#st.subs,
+    InactiveIds = maps:keys(maps:without(ActiveIds, Subs)),
+    {Inactive, St4}  = get_jobs(St3, InactiveIds),
+    notify_job_ids(Inactive, St4).
 
 
 notify_job_ids(#{} = Jobs, #st{type = Type} = St0) ->
@@ -306,13 +342,25 @@ notify(Pid, Ref, Type, Id, State, Data) ->
 
 
 get_holdoff() ->
-    config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+    couch_jobs_util:get_non_neg_int(type_monitor_holdoff_msec,
         ?TYPE_MONITOR_HOLDOFF_DEFAULT).
 
 
 get_timeout() ->
-    Default =  ?TYPE_MONITOR_TIMEOUT_DEFAULT,
-    case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of
-        "infinity" -> infinity;
-        Milliseconds -> list_to_integer(Milliseconds)
-    end.
+    couch_jobs_util:get_timeout(type_monitor_timeout_msec,
+        ?TYPE_MONITOR_TIMEOUT_DEFAULT).
+
+
+init_batch_size() ->
+    couch_jobs_util:get_non_neg_int(notifier_init_batch_size,
+        ?INIT_BATCH_SIZE).
+
+
+batch_increment() ->
+    couch_jobs_util:get_non_neg_int(notifier_batch_increment,
+        ?BATCH_INCREMENT).
+
+
+batch_factor() ->
+    couch_jobs_util:get_float_0_1(notifier_batch_factor,
+        ?BATCH_FACTOR).
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
index e4805d4..18a0cb7 100644
--- a/src/couch_jobs/src/couch_jobs_server.erl
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -34,8 +34,8 @@
 ]).
 
 
--define(TYPE_CHECK_PERIOD_DEFAULT, 15000).
--define(MAX_JITTER_DEFAULT, 5000).
+-define(TYPE_CHECK_PERIOD_DEFAULT, "15000").
+-define(MAX_JITTER_DEFAULT, "5000").
 
 
 start_link() ->
@@ -188,10 +188,10 @@ schedule_check() ->
 
 
 get_period_msec() ->
-    config:get_integer("couch_jobs", "type_check_period_msec",
+    couch_jobs_util:get_non_neg_int(type_check_period_msec,
         ?TYPE_CHECK_PERIOD_DEFAULT).
 
 
 get_max_jitter_msec() ->
-    config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+    couch_jobs_util:get_non_neg_int(type_check_max_jitter_msec,
         ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_util.erl b/src/couch_jobs/src/couch_jobs_util.erl
new file mode 100644
index 0000000..6eaad83
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_util.erl
@@ -0,0 +1,59 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_util).
+
+
+-export([
+    get_non_neg_int/2,
+    get_float_0_1/2,
+    get_timeout/2
+]).
+
+
+
+get_non_neg_int(Key, Default) when is_atom(Key), is_list(Default) ->
+    StrVal = config:get("couch_jobs", atom_to_list(Key), Default),
+    non_neg_int(Key, StrVal).
+
+
+get_float_0_1(Key, Default) when is_atom(Key), is_list(Default) ->
+    StrVal = config:get("couch_jobs", atom_to_list(Key), Default),
+    float_0_1(Key, StrVal).
+
+
+get_timeout(Key, Default) when is_atom(Key), is_list(Default) ->
+    case config:get("couch_jobs", atom_to_list(Key), Default) of
+        "infinity" -> infinity;
+        StrVal -> non_neg_int(Key, StrVal)
+    end.
+
+
+non_neg_int(Name, Str) ->
+    try
+        Val = list_to_integer(Str),
+        true = Val > 0,
+        Val
+    catch _:_ ->
+        erlang:error({invalid_non_neg_integer, {couch_jobs, Name, Str}})
+    end.
+
+
+float_0_1(Name, Str) ->
+    Val = try
+        list_to_float(Str)
+    catch error:badarg ->
+        erlang:error({invalid_float, {couch_jobs, Name, Str}})
+    end,
+    if Val >= 0.0 andalso Val =< 1.0 -> Val; true ->
+        erlang:error({float_out_of_range, {couch_jobs, Name, Str}})
+    end.
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index 3582c64..d006672 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -77,7 +77,34 @@ couch_jobs_basic_test_() ->
     }.
 
 
+couch_jobs_batching_test_() ->
+    {
+        "Test couch jobs batching logic",
+        {
+            setup,
+            fun setup_couch/0, fun teardown_couch/1,
+            {
+                foreach,
+                fun setup_batch/0, fun teardown_batch/1,
+                [
+                    ?TDEF_FE(accept_blocking),
+                    ?TDEF_FE(resubmit_enqueues_job),
+                    ?TDEF_FE(accept_max_schedtime),
+                    ?TDEF_FE(accept_no_schedule),
+                    ?TDEF_FE(subscribe),
+                    ?TDEF_FE(remove_when_subscribed_and_pending),
+                    ?TDEF_FE(remove_when_subscribed_and_running),
+                    ?TDEF_FE(subscribe_wait_multiple),
+                    ?TDEF_FE(enqueue_inactive, 15)
+                ]
+            }
+        }
+    }.
+
+
 setup_couch() ->
+    meck:new(couch_jobs_fdb, [passthrough]),
+    meck:new(couch_jobs_util, [passthrough]),
     % Because of a circular dependency between `couch_jobs` and `fabric` in
     % `fabric2_db_expiration` module, disable db expiration so when
     % `couch_jobs` is stopped the test, `fabric` app doesn't get torn down as
@@ -122,6 +149,54 @@ teardown(#{}) ->
     ok.
 
 
+setup_batch() ->
+    Ctx = setup(),
+
+    % Simulate having too many jobs to fit in a 10Mb
+    meck:expect(couch_jobs_fdb, re_enqueue_inactive, 3, meck:loop([
+        meck:raise(error, {erlfdb_error, 2101}),
+        meck:passthrough()
+    ])),
+
+    % Simulate get_inactive_since GRV timing out
+    meck:expect(couch_jobs_fdb, get_inactive_since, 4, meck:loop([
+        meck:raise(error, {erlfdb_error, 1007}),
+        meck:passthrough()
+    ])),
+
+    % Simulate get_active_since transaction timing out
+    meck:expect(couch_jobs_fdb, get_active_since, 4, meck:loop([
+        meck:raise(error, {erlfdb_error, 1031}),
+        meck:passthrough()
+    ])),
+
+    % Set up batching parameters to test small batches down to size 1
+    meck:expect(couch_jobs_util, get_non_neg_int, [
+        {[notifier_batch_increment, '_'], 1},
+        {[activity_monitor_batch_increment, '_'], 1},
+        {2, meck:passthrough()}
+    ]),
+    meck:expect(couch_jobs_util, get_float_0_1, [
+        {[notifier_batch_factor, '_'], 0.0001},
+        {[activity_monitor_batch_factor, '_'], 0.0001},
+        {2, meck:passthrough()}
+    ]),
+
+    Ctx.
+
+
+teardown_batch(Ctx) ->
+    teardown(Ctx),
+    meck:reset(couch_jobs_fdb),
+    meck:reset(couch_jobs_util),
+    meck:expect(couch_jobs_fdb, re_enqueue_inactive, 3, meck:passthrough()),
+    meck:expect(couch_jobs_fdb, get_active_since, 4, meck:passthrough()),
+    meck:expect(couch_jobs_fdb, get_inactive_since, 4, meck:passthrough()),
+    meck:expect(couch_jobs_util, get_non_neg_int, 2, meck:passthrough()),
+    meck:expect(couch_jobs_util, get_float_0_1, 2, meck:passthrough()),
+    ok.
+
+
 clear_jobs() ->
     couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
         #{jobs_path := Jobs, tx := Tx} = JTx,