You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/05/24 18:01:19 UTC

[couchdb] 01/01: CouchDB background jobs WIP3

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/rfc-couch-jobs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit db4d1dc6ba2b7830fd8542033710652f1b787e35
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu May 16 19:11:50 2019 -0400

    CouchDB background jobs WIP3
    
    So far got:
     - Main API module (couch_jobs.er)
     - Supervisor and app structures
     - FDB read/write code in couch_jobs_fdb.erl
     - All jobs creation API: add(), remove(), submit(), get_job()
     - Workers API: finish(), resubmit(), update() and accept()
     - Directory path caching
     - Metadata changes check
     - Activity monitor
     - Type monitor (start notifiers and activity monitors when new types detected)
     - Job state subscription: subscribe(), unsubscribe()
    
    Still need:
     - An example worker
     - Tests
---
 rebar.config.script                                |   1 +
 rel/reltool.config                                 |   2 +
 src/couch_jobs/.gitignore                          |   4 +
 src/couch_jobs/README.md                           |   5 +
 src/couch_jobs/src/couch_jobs.app.src              |  28 ++
 src/couch_jobs/src/couch_jobs.erl                  | 185 ++++++++
 src/couch_jobs/src/couch_jobs.hrl                  |  40 ++
 src/couch_jobs/src/couch_jobs_activity_monitor.erl | 131 ++++++
 .../src/couch_jobs_activity_monitor_sup.erl        |  57 +++
 src/couch_jobs/src/couch_jobs_app.erl              |  26 ++
 src/couch_jobs/src/couch_jobs_fdb.erl              | 469 +++++++++++++++++++++
 src/couch_jobs/src/couch_jobs_notifier.erl         | 210 +++++++++
 src/couch_jobs/src/couch_jobs_notifier_sup.erl     |  57 +++
 src/couch_jobs/src/couch_jobs_pending.erl          | 150 +++++++
 src/couch_jobs/src/couch_jobs_server.erl           | 153 +++++++
 src/couch_jobs/src/couch_jobs_sup.erl              |  66 +++
 src/couch_jobs/src/couch_jobs_type_monitor.erl     |  81 ++++
 17 files changed, 1665 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index 3b58bcb..2def724 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -76,6 +76,7 @@ SubDirs = [
     "src/couch_tests",
     "src/ddoc_cache",
     "src/fabric",
+    "src/couch_jobs",
     "src/global_changes",
     "src/mango",
     "src/rexi",
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e..afebc44 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -34,6 +34,7 @@
         couch,
         couch_epi,
         couch_index,
+        couch_jobs,
         couch_log,
         couch_mrview,
         couch_plugins,
@@ -90,6 +91,7 @@
     {app, config, [{incl_cond, include}]},
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
+    {app, couch_jobs, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
     {app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_jobs/.gitignore b/src/couch_jobs/.gitignore
new file mode 100644
index 0000000..6ef4c52
--- /dev/null
+++ b/src/couch_jobs/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_jobs.app
+.DS_Store
\ No newline at end of file
diff --git a/src/couch_jobs/README.md b/src/couch_jobs/README.md
new file mode 100644
index 0000000..b2910a5
--- /dev/null
+++ b/src/couch_jobs/README.md
@@ -0,0 +1,5 @@
+CouchDB Jobs Application
+=========================
+
+Run background jobs in CouchDB
+
diff --git a/src/couch_jobs/src/couch_jobs.app.src b/src/couch_jobs/src/couch_jobs.app.src
new file mode 100644
index 0000000..c9b8b2c
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.app.src
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_jobs, [
+    {description, "CouchDB Jobs"},
+    {vsn, git},
+    {mod, {couch_jobs_app, []}},
+    {registered, [
+        couch_jobs_sup
+    ]},
+    {applications, [
+        kernel,
+        stdlib,
+        erlfdb,
+        couch_log,
+        config,
+        fabric
+    ]}
+]}.
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
new file mode 100644
index 0000000..f13c707
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -0,0 +1,185 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs).
+
+-export([
+    add/3,
+    remove/2,
+    stop_and_remove/3,
+    resubmit/2,
+    resubmit/3,
+    get_job/2,
+
+    subscribe/2,
+    subscribe/3,
+    unsubscribe/1,
+
+    wait_job_state/2,
+    wait_job_state/3,
+
+    accept/1,
+    accept/2,
+    finish/5,
+    update/5
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+%% Job Creation API
+
+add(Type, JobId, JobOpts) ->
+    try
+        ok = validate_jobopts(JobOpts)
+    catch
+        Tag:Err -> {error, {invalid_job_args, Tag, Err}}
+    end,
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_job_fdb:add(JTx, Type, JobId, JobOpts)
+    end).
+
+
+remove(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+         couch_jobs_fdb:remove(JTx, Type, JobId)
+    end).
+
+
+stop_and_remove(Type, JobId, Timeout) ->
+    case remove(Type, JobId) of
+        not_found -> not_found;
+        ok -> ok;
+        canceled ->
+            case subscribe(Type, JobId) of
+                not_found -> not_found;
+                finished -> ok;
+                {ok, SubId, _JobState} ->
+                    case wait_job_state(SubId, finished, Timeout) of
+                        timeout -> timeout;
+                        {Type, JobId, finished} -> ok
+                    end
+            end
+    end.
+
+
+resubmit(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId)
+    end).
+
+
+resubmit(Type, JobId, NewPriority) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority)
+    end).
+
+
+
+get_job(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_job(JTx, Type, JobId)
+    end).
+
+%% Subscription API
+
+subscribe(Type, JobId) ->
+    case couch_jobs_server:get_notifier_server(Type) of
+        {ok, Server} ->
+            case couch_jobs_notifier:subscribe(Server, JobId, self()) of
+                {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+                not_found -> not_found;
+                finished -> finished
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+subscribe(Type, JobId, Fun) when is_function(Fun, 4) ->
+    case couch_jobs_server:get_notifier_server(Type) of
+        {ok, Server} ->
+             case couch_jobs_notifier:subscribe(Server, JobId, Fun, self()) of
+                 {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+                 not_found -> not_found;
+                 finished -> finished
+             end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) ->
+    try
+        couch_jobs_notifier:unsubscribe(Server, Ref)
+    after
+        flush_notifications(Ref)
+    end.
+
+
+wait_job_state({_, Ref}, Timeout) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+            {Type, JobId, JobState}
+    after
+        Timeout -> timeout
+    end.
+
+
+wait_job_state({_, Ref}, JobState, Timeout) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+            {Type, JobId, JobState}
+    after
+        Timeout -> timeout
+    end.
+
+
+%% Worker Implementation API
+
+accept(Type) ->
+    accept(Type, undefined).
+
+
+accept(Type, MaxPriority) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:accept(JTx, Type, MaxPriority)
+    end).
+
+
+finish(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:finish(JTx, Type, JobId, JobOpts, WorkerLockId)
+    end).
+
+
+update(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:update(JTx, Type, JobId, JobOpts, WorkerLockId)
+    end).
+
+
+%% Private utils
+
+validate_jobopts(#{} = JobOpts) ->
+    jiffy:encode(JobOpts),
+    ok.
+
+
+flush_notifications(Ref) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, _, _, _} ->
+            flush_notifications(Ref)
+    after
+        0 -> ok
+    end.
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
new file mode 100644
index 0000000..a9e831a
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -0,0 +1,40 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% JobOpts field definitions
+%
+-define(OPT_PRIORITY, <<"priority">>).
+-define(OPT_DATA, <<"data">>).
+-define(OPT_CANCEL, <<"cancel">>).
+-define(OPT_RESUBMIT, <<"resubmit">>).
+
+% These might be in a fabric public .hrl eventually
+%
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+% Data model definitions
+% Switch these to numbers eventually.
+%
+-define(JOBS, <<"couch_jobs">>).
+-define(DATA, <<"data">>).
+-define(PENDING, <<"pending">>).
+-define(WATCHES, <<"watches">>).
+-define(ACTIVITY_TIMEOUT, <<"activity_timeout">>).
+-define(ACTIVITY, <<"activity">>).
+
+
+% Couch jobs event notifier tag
+-define(COUCH_JOBS_EVENT, '$couch_jobs_event').
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
new file mode 100644
index 0000000..1df4329
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -0,0 +1,131 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-record(st, {
+    jtx,
+    type,
+    tref,
+    timeout = 0,
+    vs = null
+}).
+
+
+-define(MAX_JITTER_DEFAULT, 15000).
+
+
+start_link(Type) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []).
+
+
+%% gen_server callbacks
+
+init([Type]) ->
+    St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+    {ok, schedule_check(St)}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_activity, St) ->
+    St1 = check_activity(St),
+    St2 = schedule_check(St1),
+    {noreply, St2};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+check_activity(#st{vs = null} = St) ->
+    St#st{vs = get_vs(St)};
+
+
+check_activity(#st{} = St) ->
+    NewVS = get_vs(St),
+    St1 = case get_inactive_since(St) of
+        [] -> St;
+        [_ | _] -> re_enqueue_inactive(St)
+    end,
+    St1#st{vs = NewVS}.
+
+
+get_timeout_msec(JTx, Type) ->
+    TimeoutSec = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_timeout(JTx1, Type)
+    end),
+    timer:seconds(TimeoutSec).
+
+
+schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
+    % Reset versionstamp if timeout changed.
+    St1 = case get_timeout_msec(JTx, Type) of
+        OldTimeout -> St;
+        NewTimeout -> St#st{vs = null, timeout = NewTimeout}
+    end,
+    #st{timeout = Timeout} = St1,
+    MaxJitter = max(Timeout, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+    St1#st{tref = erlang:set_after(Wait, self(), check_activity)}.
+
+
+get_vs(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
+re_enqueue_inactive(#st{jtx = JTx, type = Type, vs = VS}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, VS)
+    end).
+
+
+get_inactive_since(#st{jtx = JTx, type = Type, vs = VS}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_job_fdb:get_inactive_since(JTx1, Type, VS)
+    end).
+
+
+get_max_jitter_msec()->
+    config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
new file mode 100644
index 0000000..943926e
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
@@ -0,0 +1,57 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_monitor/1,
+    stop_monitor/1
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_monitor(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_monitor(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_monitor,
+            restart => temporary,
+            start => {couch_jobs_activity_monitor, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_app.erl b/src/couch_jobs/src/couch_jobs_app.erl
new file mode 100644
index 0000000..720b948
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_app.erl
@@ -0,0 +1,26 @@
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_Type, []) ->
+    couch_jobs_sup:start_link().
+
+
+stop([]) ->
+    ok.
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
new file mode 100644
index 0000000..e0cfad3
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -0,0 +1,469 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_fdb).
+
+
+-export([
+    add/4,
+    remove/3,
+    resubmit/4,
+    get_job/3,
+
+    accept/2,
+    accept/3,
+    finish/5,
+    update/5,
+
+    set_activity_timeout/3,
+    get_activity_timeout/2,
+
+    get_types/1,
+
+    get_activity_vs/2,
+    get_activity_vs_and_watch/2,
+    get_active_since/3,
+    re_enqueue_inactive/3,
+
+    clear_type/2,
+
+    init_cache/0,
+
+    get_jtx/0,
+    get_jtx/1,
+
+    tx/2
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Data model
+%
+% (?JOBS, ?DATA, Type, JobId) = (Sequence, WorkerLockId, Priority, JobOpts)
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+% (?JOBS, ?WATCHES, Type) = Sequence
+% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
+% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
+
+% Job creation API
+
+add(#{jtx := true} = JTx0, Type, JobId, JobOpts) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        <<_/binary>> ->
+            {error, duplicate_job};
+        not_found ->
+            maybe_enqueue(JTx, Type, JobId, JobOpts)
+    end.
+
+
+remove(#{jtx := true} = JTx0, Type, JobId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, WorkerLockId, _, _} = Job when WorkerLockId =/= null ->
+            ok = cancel(JTx, Key, Job),
+            canceled;
+        {_, _, null, _,  _} ->
+            erlfdb:clear(Tx, Key),
+            ok;
+        {_, _, Priority, _, _} ->
+            couch_jobs_pending:remove(JTx, Type, Priority, JobId),
+            erlfdb:clear(Tx, Key),
+            ok;
+        not_found ->
+            not_found
+    end.
+
+
+resubmit(#{jtx := true} = JTx, Type, JobId, NewPriority) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, _, _, #{?OPT_RESUBMIT := true}} ->
+            ok;
+        {Seq, WorkerLockId, Priority, #{} = JobOpts} ->
+            JobOpts1 = JobOpts#{?OPT_RESUBMIT => true},
+            OldPriority = maps:get(?OPT_PRIORITY, JobOpts1, undefined),
+            JobOpts2 = case NewPriority =/= OldPriority of
+                true -> JobOpts1#{?OPT_PRIORITY => NewPriority};
+                false -> JobOpts1
+            end,
+            JobOptsEnc = jiffy:encode(JobOpts2),
+            % Don't update priority value from the tuple as that points to entry
+            % in pending queue. Only update the one from JobOpts and it will be
+            % used when the job is re-enqueued later
+            Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val),
+            ok;
+        not_found ->
+            not_found
+    end.
+
+
+get_job(#{jtx := true} = JTx, Type, JobId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, WorkerLockId, Priority, JobOpts} ->
+            {ok, JobOpts, job_state(WorkerLockId, Priority)};
+        not_found ->
+            not_found
+    end.
+
+
+% Worker public API
+
+accept(#{jtx := true} = JTx, Type) ->
+    accept(JTx, Type, undefined).
+
+
+accept(#{jtx := true} = JTx0, Type, MaxPriority) ->
+    #{jtx := true} = JTx = get_jtx(JTx0),
+    case couch_jobs_pending:dequeue(JTx, Type, MaxPriority) of
+        not_found ->
+            not_found;
+        <<_/binary>> = JobId ->
+            WorkerLockId = fabric2_util:uuid(),
+            update_lock(JTx, Type, JobId, WorkerLockId),
+            update_activity(JTx, Type, JobId),
+            update_watch(JTx, Type),
+            {ok, JobId, WorkerLockId}
+    end.
+
+
+finish(#{jtx := true} = JTx0, Type, JobId, JobOpts, WorkerLockId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {Status, {Seq, _, _, JobOptsCur}} when
+                Status =:= ok orelse Status =:= canceled ->
+            % If the job was canceled, allow updating its data one last time
+            clear_activity(JTx, Type, Seq),
+            MergedOpts = maps:merge(JobOptsCur, JobOpts),
+            maybe_enqueue(JTx, Type, JobId, MergedOpts),
+            update_watch(JTx, Type),
+            ok;
+        {worker_conflict, _} ->
+            worker_conflict
+    end.
+
+
+update(#{jtx := true} = JTx, Type, JobId, JobOpts, WorkerLockId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {ok, {null, WorkerLockId, null, JobOptsCur}} ->
+            update_activity(JTx, Type, JobId),
+            JobOpts1 = maps:merge(JobOptsCur, JobOpts),
+            JobOptsEnc = jiffy:encode(JobOpts1),
+            Val = erlfdb_tuple:pack({null, WorkerLockId, null, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val),
+            update_watch(JTx, Type);
+        {ok, InvalidState} ->
+            error({couch_job_invalid_updata_state, InvalidState});
+        {Status, _} when Status =/= ok ->
+            Status
+    end.
+
+
+% Type and activity monitoring API
+
+set_activity_timeout(#{jtx := true} = JTx, Type, Timeout) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    Val = erlfdb_tuple:pack(Timeout),
+    erlfdb:set(Tx, Key, Val).
+
+
+get_activity_timeout(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+    erlfdb_tuple:unpack(Val).
+
+
+get_types(#{jtx := true} = JTx) ->
+    #{tx := Tx, type := Type, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range_startswith(Tx, Prefix, Opts),
+    lists:map(fun(K, _V) ->
+        Type = erfdb_tuple:unpack(K, Prefix)
+    end, erlfdb:wait(Future)).
+
+
+get_activity_vs(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    erlfdb:wait(erlfdb:get(Tx, Key)).
+
+
+get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    {Val, WatchFuture} = erlfdb:get_and_watch(Tx, Key),
+    {Val, WatchFuture}.
+
+
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tupe:pack({?ACTIVITY}, Jobs),
+    StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    StartKeySel = erlfdb_key:first_greater_than(StartKey),
+    {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
+    lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+    {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
+    EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    EndKeySel = erlfdb_key:last_less_or_equal(EndKey),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
+    lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+re_enqueue_inactive(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    JobIds = get_inactive_since(JTx, Type, Versionstamp),
+    lists:foreach(fun(JobId) ->
+        Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+        {Seq, _, _, JobOpts} = get_job(Tx, Key),
+        clear_activity(JTx, Type, Seq),
+        maybe_enqueue(JTx, Type, JobId, JobOpts)
+    end, JobIds),
+    case length(JobIds) > 0 of
+        true -> update_watch(JTx, Type);
+        false -> ok
+    end,
+    JobIds.
+
+
+% Debug and testing API
+
+clear_type(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    lists:foreach(fun(Section) ->
+        Prefix = erlfdb_tuple:pack({Section, Type}, Jobs),
+        erlfdb:clear_range_startswith(Tx, Prefix)
+    end, [?DATA, ?PENDING, ?WATCHES, ?ACTIVITY_TIMEOUT, ?ACTIVITY]).
+
+
+% Cache initialization API. Called from the supervisor just to create the ETS
+% table. It returns `ignore` to tell supervisor it won't actually start any
+% process, which is what we want here.
+%
+init_cache() ->
+    ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
+    ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
+    ignore.
+
+
+% Cached job transaction object. This object wraps a transaction, caches the
+% directory lookup path, and the metadata version. The function can be used from
+% or outside the transaction. When used from a transaction it will verify if
+% the metadata was changed, and will refresh automatically.
+%
+get_jtx() ->
+    get_jtx(undefined).
+
+
+get_jtx(#{tx := Tx} = _TxDb) ->
+    get_jtx(Tx);
+
+get_jtx(undefined = _Tx) ->
+    case ets:lookup(?MODULE, ?JOBS) of
+        [{_, #{} = JTx}] -> JTx;
+        [] -> update_jtx_cache(init_jtx(undefined))
+    end;
+
+get_jtx({erlfdb_transaction, _} = Tx) ->
+    case ets:lookup(?MODULE, ?JOBS) of
+        [{_, #{} = JTx}] -> ensure_current(JTx#{tx := Tx});
+        [] -> update_jtx_cache(init_jtx(Tx))
+    end.
+
+
+% Transaction processing to be used with couch jobs' specific transaction
+% contexts
+%
+tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
+    fabric2_fdb:transactional(JTx, Fun).
+
+
+% Utility fdb functions used by other module in couch_job. Maybe move these to a
+% separate module if the list keep growing
+
+has_versionstamp(?UNSET_VS) ->
+    true;
+
+has_versionstamp(Tuple) when is_tuple(Tuple) ->
+    has_versionstamp(tuple_to_list(Tuple));
+
+has_versionstamp([Elem | Rest]) ->
+    has_versionstamp(Elem) orelse has_versionstamp(Rest);
+
+has_versionstamp(_Other) ->
+    false.
+
+
+% Private helper functions
+
+cancel(#{jx := true},  Key, {_, _, _, #{?OPT_CANCEL := true}}) ->
+    ok;
+
+cancel(#{jtx := true, tx := Tx}, Key, Job) ->
+    {Seq, WorkerLockId, Priority, JobOpts} = Job,
+    JobOpts1 = JobOpts#{?OPT_CANCEL => true},
+    JobOptsEnc = jiffy:encode(JobOpts1),
+    Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+    erlfdb:set(Tx, Key, Val),
+    ok.
+
+
+maybe_enqueue(#{jtx := true} = JTx, Type, JobId, JobOpts) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    Resubmit = maps:get(?OPT_RESUBMIT, JobOpts, false) == true,
+    Cancel = maps:get(?OPT_CANCEL, JobOpts, false) == true,
+    JobOpts1 = maps:without([?OPT_RESUBMIT], JobOpts),
+    Priority = maps:get(?OPT_PRIORITY, JobOpts1, ?UNSET_VS),
+    JobOptsEnc = jiffy:encode(JobOpts1),
+    case Resubmit andalso not Cancel of
+        true ->
+            Val = erlfdb_tuple:pack_vs({null, null, Priority, JobOptsEnc}),
+            case couch_jobs_util:has_versionstamp(Priority) of
+                true -> erlfdb:set_versionstamped_value(Tx, Key, Val);
+                false -> erlfdb:set(Tx, Key, Val)
+            end,
+            couch_jobs_pending:enqueue(JTx, Type, Priority, JobId);
+        false ->
+            Val = erlfdb_tuple:pack({null, null, null, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val)
+    end,
+    ok.
+
+
+get_job(Tx = {erlfdb_transaction, _}, Key) ->
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        <<_/binary>> = Val ->
+            {Seq, WorkerLockId, Priority, JobOptsEnc} = erlfdb_tuple:unpack(Val),
+            JobOpts = jiffy:decode(JobOptsEnc, [return_maps]),
+            {Seq, WorkerLockId, Priority, JobOpts};
+        not_found ->
+            not_found
+    end.
+
+
+get_job_and_status(Tx, Key, WorkerLockId) ->
+    case get_job(Tx, Key) of
+        {_, LockId, _, _} = Res when WorkerLockId =/= LockId ->
+            {worker_conflict, Res};
+        {_, _, _, #{?OPT_CANCEL := true}} = Res ->
+            {canceled, Res};
+        {_, _, _, #{}} = Res ->
+            {ok, Res};
+        not_found ->
+            {worker_conflict, not_found}
+    end.
+
+
+update_activity(#{jtx := true} = JTx, Type, JobId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
+    erlfdb:set_versionstamped_key(Tx, Key, JobId).
+
+
+clear_activity(#{jtx := true} = JTx, Type, Seq) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+update_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    erlfdb:set_versionstamped_value(Tx, Key, ?UNSET_VS).
+
+
+update_lock(#{jtx := true} = JTx, Type, JobId, WorkerLockId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {null, null, _, JobOpts} ->
+            ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)},
+            Val = erlfdb_tuple:pack_vs(ValTup),
+            erlfdb:set_versionstamped_value(Tx, Key, Val);
+        InvalidState ->
+            error({couch_job_invalid_accept_state, InvalidState})
+    end.
+
+
+job_state(WorkerLockId, Priority) ->
+    case {WorkerLockId, Priority} of
+        {null, null} ->
+            finished;
+        {WorkerLockId, _} when WorkerLockId =/= null ->
+            running;
+        {_, Priority} when Priority =/= null ->
+            pending;
+        ErrorState ->
+            error({invalid_job_state, ErrorState})
+    end.
+
+
+
+
+
+% This a transaction context object similar to the Db = #{} one from fabric2_fdb.
+% It's is used to cache the jobs path directory (to avoid extra lookups on every
+% operation) and to check for metadata changes (in case directory changes).
+%
+init_jtx(undefined) ->
+    fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
+
+init_jtx({erlfdb_transaction, _} = Tx) ->
+    Root = erlfdb_directory:root(),
+    CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+    LayerPrefix = erlfdb_directory:get_name(CouchDB),
+    JobsPrefix = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
+    Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+    % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
+    % but we also assert that this is a job transaction using the jtx => true field
+    #{
+        jtx => true,
+        tx => Tx,
+        layer_prefix => LayerPrefix,
+        jobs_prefix => JobsPrefix,
+        md_version => Version
+    }.
+
+
+ensure_current(#{jtx := true, tx := Tx, md_version := Version} = JTx) ->
+    case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+        Version -> JTx;
+        _NewVersion -> update_jtx_cache(init_jtx(Tx))
+    end.
+
+
+update_jtx_cache(#{jtx := true} = JTx) ->
+    CachedJTx = JTx#{tx := undefined},
+    ets:insert(?MODULE, {?JOBS, CachedJTx}),
+    JTx.
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
new file mode 100644
index 0000000..d6a4f98
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -0,0 +1,210 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1,
+    subscribe/3,
+    subscribe/4,
+    unsubscribe/2
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, 1000).
+-define(TYPE_MONITOR_TIMEOUT_DEFAULT, 60000).
+
+
+-record(st, {
+    jtx,
+    type,
+    monitor_pid,
+    subs
+}).
+
+
+start_link(Type) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []).
+
+
+subscribe(Server, JobId, Pid) when is_pid(Pid) ->
+    gen_server:call(Server, {subscribe, JobId, nil, Pid}, infinity).
+
+
+subscribe(Server, JobId, Fun, Pid) when is_function(Fun, 4), is_pid(Pid) ->
+    gen_server:call(Server, {subscribe, JobId, Fun, Pid}, infinity).
+
+
+unsubscribe(Server, Ref) when is_reference(Ref) ->
+    gen_server:call(Server, {unsubscribe, Ref}, infinity).
+
+
+init([Type]) ->
+    EtsOpts = [ordered_set, protected],
+    JTx = couch_jobs_fdb:get_jtx(),
+    St = #st{jtx = JTx, type = Type, subs = ets:new(?MODULE, EtsOpts)},
+    VS = get_type_vs(St),
+    Pid = couch_jobs_type_monitor:start(Type, VS, get_holdoff(), get_timeout()),
+    {ok, St#st{monitor_pid = Pid}}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call({subscribe, JobId, Fun, Pid}, _From, #st{} = St) ->
+    Res = case get_job(St, JobId) of
+        not_found ->
+            not_found;
+        {ok, _, finished} ->
+            finished;
+        {ok, _, JobState} ->
+            Ref = erlang:monitor(process, Pid),
+            ets:insert(St#st.subs, {{JobId, Ref}, {Fun, Pid, JobState}}),
+            {Ref, JobState}
+    end,
+    {reply, Res, St};
+
+handle_call({unsubscribe, Ref}, _From, St) ->
+    true = ets:match_delete(?MODULE, {{'$1', Ref}, '_'}),
+    {reply, ok, St};
+
+% couch_jobs_type_monitor calls this
+handle_call({type_updated, VS}, _From, St) ->
+    ok = notify_subscribers(VS, St),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _, _}, #st{subs = Subs} = St) ->
+    true = ets:match_delete(Subs, {{'$1', Ref}, '_'}),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+get_job(#st{jtx = JTx, type = Type}, JobId) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_job(JTx1, Type, JobId)
+    end).
+
+
+get_jobs(#st{jtx = JTx, type = Type}, JobIds) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        lists:map(fun(JobId) ->
+            case couch_jobs_fdb:get_job(JTx1, Type, JobId) of
+                {ok, _, JobState} -> {JobId, JobState};
+                not_found -> {JobId, not_found}
+            end
+        end, JobIds)
+    end).
+
+
+get_type_vs(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_job_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
+% "Active since" is the list of jobs that have been active (running)
+% and updated at least once since the given versionstamp. These are relatively
+% cheap to find as it's just a range read in the ?ACTIVITY subspace.
+%
+get_active_since(#st{jtx = JTx, type = Type}, VS, SubscribedJobs) ->
+    AllUpdatedSet = sets:from_list(couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_active_since(JTx1, Type, VS)
+    end)),
+    SubscribedSet = sets:from_list(SubscribedJobs),
+    SubscribedActiveSet = sets:intersection(AllUpdatedSet, SubscribedSet),
+    sets:to_list(SubscribedActiveSet).
+
+
+get_subscribers(JobId, #st{subs = Subs} = St) ->
+    % Use ordered ets's fast matching of partial key prefixes here
+    lists:map(fun([Ref, {Fun, Pid, JobState}]) ->
+        {Ref, Fun, Pid, JobState}
+    end, ets:match(Subs, {{JobId, '$1'}, '$2'})).
+
+
+get_subscribed_job_ids(#st{subs = Subs}) ->
+    Matches = ets:match(Subs, {{'$1', '_'}, '_'}),
+    JobIds = lists:usort(lists:flatten(Matches)).
+
+
+notify_subscribers(VS, #st{subs = Subs} = St) ->
+    JobIds = get_subscribed_job_ids(St),
+    % First gather the easy (cheap) active jobs. Then with those out of way,
+    % inspect each job to get its state.
+    Active = get_active_since(St, VS, JobIds),
+    JobStates = [{JobId, running} || JobId <- Active],
+    JobStates1 = JobStates ++ get_jobs(St, JobIds -- Active),
+    lists:foreach(fun({JobId, JobState}) ->
+        lists:foreach(fun
+            ({Ref, Fun, Pid, JobState}) ->
+                ok;
+            ({Ref, Fun, Pid, _}) ->
+                notify(JobId, Ref, Fun, Pid, JobState, St)
+        end, get_subscribers(JobId, St)),
+        case lists:member(JobState, [finished, not_found]) of
+            true -> ets:match_delete(Subs, {{JobId, '_'}, '_'});
+            false -> ok
+        end
+    end, JobStates1).
+
+
+notify(JobId, Ref, Fun, _, JobState, St) when is_function(Fun, 4) ->
+    try
+        Fun(Ref, St#st.type, JobId, JobState)
+    catch
+        Tag:Err ->
+            ErrMsg = "~p : callback ~p failed ~p ~p =>  ~p:~p",
+            couch_log:error(ErrMsg, [?MODULE, Fun, JobId, JobState, Tag, Err])
+    end;
+
+notify(JobId, Ref, _, Pid, JobState, St) ->
+    Pid ! {?COUCH_JOBS_EVENT, Ref, St#st.type, JobId, JobState}.
+
+
+get_holdoff() ->
+    config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+        ?TYPE_MONITOR_HOLDOFF_DEFAULT).
+
+
+get_timeout() ->
+    config:get_integer("couch_jobs", "type_monitor_timeout_msec",
+        ?TYPE_MONITOR_TIMEOUT_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_notifier_sup.erl b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
new file mode 100644
index 0000000..6705b45
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
@@ -0,0 +1,57 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_notifier/1,
+    stop_notifier/1
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_notifier(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_notifier(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_monitor,
+            restart => temporary,
+            start => {couch_jobs_notifier, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_pending.erl b/src/couch_jobs/src/couch_jobs_pending.erl
new file mode 100644
index 0000000..d015c04
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_pending.erl
@@ -0,0 +1,150 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_pending).
+
+
+-export([
+    enqueue/4,
+    dequeue/3,
+    remove/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Make this configurable or auto-adjustable based on retries
+%
+-define(RANGE_LIMIT, 256).
+
+
+% Data model
+%
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+
+
+% Public API
+
+% Enqueue a job into the pending queue. Priority determines the place in the
+% queue.
+%
+enqueue(#{jtx := true} = JTx, Type, Priority, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    case couch_jobs_fdb:has_versionstamp(Priority) of
+        true ->
+            Key = erlfdb_tuple:pack_vs({?PENDING, Type, Priority, JobId}, Jobs),
+            erlfdb:set_versionstamped_key(Tx, Key, null);
+        false ->
+            Key = erlfdb_tuple:pack({?PENDING, Type, Priority, JobId}, Jobs),
+            erlfdb:set(Tx, Key, null)
+    end.
+
+
+% Dequeue a job from the front of the queue.
+%
+% If MaxPriority is specified, any job between the front of the queue and
+% up until MaxPriority is considered. Workers may randomly pick jobs in that
+% range. That can be used to avoid contention at the expense of strict dequeue
+% ordering. For instance if priorities are 0-high, 1-normal and 2-low, and we wish
+% to process normal and urgent jobs, then MaxPriority=1-normal would
+% accomplish that.
+%
+dequeue(#{jtx := true} = JTx, Type, undefined) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+    case get_front_priority(Tx, Prefix) of
+        not_found ->
+            not_found;
+        Priority ->
+            {Start, End} = erlang_tuple:range(Priority, Prefix),
+            case clear_random_key_from_range(Tx, Start, End) of
+                not_found ->
+                    not_found;
+                <<_/binary>> = PendingKey ->
+                    {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+                    JobId
+            end
+    end;
+
+% If MaxPriority is not specified, only jobs with the same priority as the item
+% at the front of the queue are considered. Two extremes are useful to
+% consider:
+%
+%  * Priority is just one static value (say null, or "normal"). In that case,
+% the queue is effectively a bag of tasks that can be grabbed in any order,
+% which should minimize contention.
+%
+%  * Each job has a unique priority value, for example a versionstamp. In that
+%  case, the queue has strict FIFO behavior, but there will be more contention
+%  at the front of the queue.
+%
+dequeue(#{jtx := true} = JTx, Type, MaxPriority) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+    StartKeySel = erlfdb_key:first_greater_than(Prefix),
+    End = erlfdb_tuple:pack({?PENDING, Type, MaxPriority, <<16#FF>>}, Prefix),
+    EndKeySel = erlfdb_key:last_less_than(End),
+    case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of
+        not_found ->
+            not_found;
+        <<_/binary>> = PendingKey ->
+            {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+            JobId
+   end.
+
+
+% Remove a job from the pending queue. This is used, for example, when a job is
+% canceled while it was waiting in the pending queue.
+%
+remove(#{jtx := true} = JTx, Type, Priority, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?PENDING, Type, Priority, JobId}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+% Private functions
+
+% The priority of the item at the front. If there are multiple
+% items with the same priority, workers can randomly pick between them to
+% avoid contention.
+%
+get_front_priority(Tx, Prefix) ->
+    Opts = [{limit, 1}, {snapshot, true}],
+    case erlfdb:wait(erfldb:get_range_startswith(Tx, Prefix, Opts)) of
+        [] ->
+            not_found;
+        [{FrontKey, _}] ->
+            {Priority, _} = erlfdb_tuple:unpack(FrontKey, Prefix),
+            Priority
+    end.
+
+
+% Pick a random key from the range snapshot. Then radomly pick a key to
+% clear. Before clearing, ensure there is a read conflict on the key in
+% in case other workers have picked the same key.
+%
+clear_random_key_from_range(Tx, Start, End) ->
+    Opts = [
+        {limit, ?RANGE_LIMIT},
+        {snapshot, true}
+    ],
+    case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of
+        [] ->
+            not_found;
+        [{_, _} | _] = KVs ->
+            Index = rand:uniform(length(KVs)),
+            {Key, _} = lists:nth(Index),
+            erlfdb:add_read_conflict_key(Tx, Key),
+            erlfdb:clear(Tx, Key),
+            Key
+    end.
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
new file mode 100644
index 0000000..27950ba
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -0,0 +1,153 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_server).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0,
+    get_notifier_server/1,
+    force_check_types/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(TYPE_CHECK_PERIOD_DEFAULT, 5000).
+-define(MAX_JITTER_DEFAULT, 5000).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+% This is for testing and debugging mostly
+force_check_types() ->
+    get_server:call(?MODULE, force_check_types, infinity).
+
+
+get_notifier_server(Type) ->
+    case get_type_pid_refs(Type) of
+        {{_, _},  {NotifierPid, _}} -> {ok, NotifierPid};
+        not_found -> {error, not_found}
+    end.
+
+
+init(_) ->
+    ets:new(?MODULE, [protected, named_table]),
+    {ok, nil}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(force_check_types, _From, St) ->
+    check_types(),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_types, St) ->
+    check_types(),
+    schedule_check(),
+    {noreply, St};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, St) ->
+    LogMsg = "~p : process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {stop, {unexpected_process_exit, Pid, Reason}, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+check_types() ->
+    FdbTypes = fdb_types(),
+    EtsTypes = ets_types(),
+    ToStart = FdbTypes -- EtsTypes,
+    ToStop = EtsTypes -- FdbTypes,
+    lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart),
+    lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop).
+
+
+start_monitors(Type) ->
+    MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of
+        {ok, Pid1} -> {Pid1, monitor(process, Pid1)};
+        {error, Error1} -> error({failed_to_start_monitor, Type, Error1})
+    end,
+    NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of
+        {ok, Pid2} -> {Pid2, monitor(process, Pid2)};
+        {error, Error2} -> error({failed_to_start_notifier, Type, Error2})
+    end,
+    ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}).
+
+
+stop_monitors(Type) ->
+    {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type),
+    ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid),
+    demonitor(MonRef, [flush]),
+    ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid),
+    demonitor(NotifierRef, [flush]).
+
+
+get_type_pid_refs(Type) ->
+    case ets:lookup(?MODULE, Type) of
+        [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef};
+        [] -> not_found
+    end.
+
+
+ets_types() ->
+    lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})).
+
+
+fdb_types() ->
+     couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_types()
+    end).
+
+
+schedule_check() ->
+    Timeout = get_period_msec(),
+    MaxJitter = max(Timeout, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+    erlang:set_after(Wait, self(), check_types).
+
+
+get_period_msec() ->
+    config:get_integer("couch_jobs", "type_check_period_msec",
+        ?TYPE_CHECK_PERIOD_DEFAULT).
+
+
+get_max_jitter_msec() ->
+    config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_sup.erl b/src/couch_jobs/src/couch_jobs_sup.erl
new file mode 100644
index 0000000..20d5440
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_sup.erl
@@ -0,0 +1,66 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Flags = #{
+        strategy => rest_for_one,
+        intensity => 1,
+        period => 5
+    },
+    Children = [
+        #{
+            id => couch_jobs_fdb,
+            restart => transient,
+            start => {couch_jobs_fdb, init_cache, []}
+        },
+        #{
+            id => couch_jobs_activity_monitor_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_activity_monitor_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_notifier_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_notifier_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_server,
+            restart => permanent,
+            shutdown => brutal_kill,
+            start => {couch_jobs_server, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl
new file mode 100644
index 0000000..84dd8fc
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_type_monitor).
+
+
+-export([
+    start/4,
+    stop/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(st, {
+    jtx,
+    type,
+    vs,
+    parent,
+    timestamp,
+    holdoff,
+    timeout
+}).
+
+
+start(Type, VS, HoldOff, Timeout) ->
+     Parent = self(),
+     spawn_link(fun() ->
+        loop(#st{
+            jtx = couch_jobs_fdb:get_jtx(),
+            type = Type,
+            vs = VS,
+            parent = Parent,
+            timestamp = 0,
+            holdoff = HoldOff,
+            timeout = Timeout
+        })
+    end).
+
+
+stop(Pid) ->
+    Ref = monitor(process, Pid),
+    unlink(Pid),
+    exit(Pid, kill),
+    receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+loop(#st{vs = VS, timeout = Timeout} = St) ->
+    {St1, Watch} = case get_vs_and_watch(St) of
+        {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W};
+        {VS, W} -> {St, W}
+    end,
+    erlfdb:wait(Watch, [{timeout, Timeout}]),
+    loop(St1).
+
+
+notify(#st{} = St) ->
+    #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St,
+    Now = erlang:system_time(millisecond),
+    case Now - Ts of
+        Dt when Dt < HoldOff -> timer:sleep(min(HoldOff - Dt, 0));
+        _ -> ok
+    end,
+    gen_server:call(Pid, {type_updated, VS}, infinity),
+    St#st{timestamp = Now}.
+
+
+get_vs_and_watch(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
+    end).