You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/06/03 17:44:01 UTC

[couchdb] branch prototype/rfc-couch-jobs updated (62f786e -> 4a904b5)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch prototype/rfc-couch-jobs
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 62f786e  CouchDB background jobs
     new 4a904b5  CouchDB background jobs

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (62f786e)
            \
             N -- N -- N   refs/heads/prototype/rfc-couch-jobs (4a904b5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch_jobs/src/couch_jobs.app.src      |   5 +-
 src/couch_jobs/src/couch_jobs.hrl          |  14 +--
 src/couch_jobs/src/couch_jobs_notifier.erl |   2 +
 src/couch_jobs/src/couch_jobs_server.erl   |  19 ++-
 src/couch_jobs/test/couch_jobs_tests.erl   | 187 +++++++++++++++++++++++++----
 5 files changed, 190 insertions(+), 37 deletions(-)


[couchdb] 01/01: CouchDB background jobs

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/rfc-couch-jobs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4a904b555a4d5f6bea178583357ff5bbcab7e181
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu May 16 19:11:50 2019 -0400

    CouchDB background jobs
    
    Done:
     - README file to describe each module
     - Supervisor and app structures
     - Main API module (couch_jobs.erl)
     - FDB read/write code in couch_jobs_fdb.erl
     - All jobs creation API: add(), remove(), submit(), get_job()
     - Workers API: finish(), resubmit(), update() and accept()
     - Directory path caching with metadata version checks
     - Activity monitor
     - Type monitor (start notifiers and activity monitors when new types detected)
     - Job state subscription: subscribe(), unsubscribe()
     - Tests. 90+% coverage
    
    Todo:
     - Example worker (maybe part of testing)
---
 rebar.config.script                                |   1 +
 rel/reltool.config                                 |   2 +
 src/couch_jobs/.gitignore                          |   4 +
 src/couch_jobs/README.md                           |  66 +++
 src/couch_jobs/rebar.config                        |  14 +
 src/couch_jobs/src/couch_jobs.app.src              |  31 ++
 src/couch_jobs/src/couch_jobs.erl                  | 300 +++++++++++
 src/couch_jobs/src/couch_jobs.hrl                  |  51 ++
 src/couch_jobs/src/couch_jobs_activity_monitor.erl | 137 +++++
 .../src/couch_jobs_activity_monitor_sup.erl        |  64 +++
 src/couch_jobs/src/couch_jobs_app.erl              |  26 +
 src/couch_jobs/src/couch_jobs_fdb.erl              | 565 +++++++++++++++++++
 src/couch_jobs/src/couch_jobs_notifier.erl         | 218 ++++++++
 src/couch_jobs/src/couch_jobs_notifier_sup.erl     |  64 +++
 src/couch_jobs/src/couch_jobs_pending.erl          | 159 ++++++
 src/couch_jobs/src/couch_jobs_server.erl           | 184 +++++++
 src/couch_jobs/src/couch_jobs_sup.erl              |  66 +++
 src/couch_jobs/src/couch_jobs_type_monitor.erl     |  78 +++
 src/couch_jobs/test/couch_jobs_tests.erl           | 597 +++++++++++++++++++++
 19 files changed, 2627 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index 3b58bcb..2def724 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -76,6 +76,7 @@ SubDirs = [
     "src/couch_tests",
     "src/ddoc_cache",
     "src/fabric",
+    "src/couch_jobs",
     "src/global_changes",
     "src/mango",
     "src/rexi",
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e..afebc44 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -34,6 +34,7 @@
         couch,
         couch_epi,
         couch_index,
+        couch_jobs,
         couch_log,
         couch_mrview,
         couch_plugins,
@@ -90,6 +91,7 @@
     {app, config, [{incl_cond, include}]},
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
+    {app, couch_jobs, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
     {app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_jobs/.gitignore b/src/couch_jobs/.gitignore
new file mode 100644
index 0000000..6ef4c52
--- /dev/null
+++ b/src/couch_jobs/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_jobs.app
+.DS_Store
\ No newline at end of file
diff --git a/src/couch_jobs/README.md b/src/couch_jobs/README.md
new file mode 100644
index 0000000..b0552b2
--- /dev/null
+++ b/src/couch_jobs/README.md
@@ -0,0 +1,66 @@
+CouchDB Jobs Application
+========================
+
+Run background jobs in CouchDB
+
+Design (RFC) discussion: https://github.com/apache/couchdb-documentation/pull/409/files
+
+This is a description of some of the modules:
+
+ * `couch_jobs`: The main API module. It contains functions for creating,
+   accepting, executing, and monitoring jobs. A common pattern in this module
+   is to get a jobs transaction object (named `JTx` throughout the code), then
+   start a transaction and call a bunch of functions from `couch_jobs_fdb` in
+   that transaction.
+
+ * `couch_jobs_fdb`: This is a layer that talks to FDB. There is a lot of tuple
+   packing and unpacking, reading ranges and also managing transaction objects.
+
+ * `couch_jobs_pending`: This module implements the pending jobs queue. These
+   functions could all go in `couch_jobs_fdb` but the implemention was fairly
+   self-contained, with its own private helper functions, so it made sense to
+   move to a separate module.
+
+ * `couch_jobs_activity_monitor`: Here is where the "activity monitor"
+   functionality is implemented. That's done with `gen_server` instance running for
+   each type. This `gen_server` periodically check if there are inactive jobs
+   for its type, and if they are, it re-enqueues them. If the timeout value
+   changes, then it skips the pending check, until the new timeout expires.
+
+ * `couch_jobs_activity_monitor_sup` : This is a simple one-for-one supervisor
+   to spawn `couch_job_activity_monitor` instances for each type.
+
+ * `couch_jobs_type_monitor` : This is a helper process meant to be
+   spawn_linked from a parent `gen_server` and use to monitor activity for a
+   particular job type. If any jobs of that type have an update it notifies the
+   parent process. To avoid firing for every single update, there is a
+   configurable `holdoff` parameter to wait a bit before notifying the parent
+   process. This functionality is used by the `couch_jobs_notifier` to drive
+   job state subscriptions.
+
+ * `couch_jobs_notifier`: Is responsible for job state subscriptions. Just like
+   with activity monitors there is a `gen_server` instance of this running per
+   each type. It uses a linked `couch_jobs_type_monitor` process to wait for
+   any job updates. When an update notification arrives, it can efficiently
+   find out if any active jobs have been updated, by reading the `(?JOBS,
+   ?ACTIVITY, Type, Sequence)` range. That should account for the bulk of
+   changes. The jobs that are not active anymore, are queried individually.
+   Subscriptions are managed in an ordered set ETS table with the schema that
+   looks like `{{JobId, Ref}, {Fun, Pid, JobState}}`. The `Ref` is reference
+   used to track individual subscriptions and also used to monitor subscriber
+   processes in case they die (then it it auto-subscribes them).
+
+ * `couch_jobs_notifier_sup`: A simple one-for-one supervisor to spawn
+   `couch_job_notifier` processes for each type.
+
+ * `couch_jobs_server`: This is a `gen_server` which keeps track of job
+   types. It then starts or stops activity monitors and notifiers for each
+   type. To do that it queries the ` (?JOBS, ?ACTIVITY_TIMEOUT)` periodically.
+
+ * `couch_jobs_sup`: This is the application supervisor. The restart strategy
+   is `rest_for_one`, meaning that a when a child restarts, the sibling
+   following it will restart. One interesting entry there is the first child
+   which is used just to create an ETS table used by `couch_jobs_fdb` to cache
+   transaction object (`JTx` mentioned above). That child calls `init_cache/0`,
+   where it creates the ETS then returns with `ignore` so it doesn't actually
+   spawn a process. The ETS table will be owned by the supervisor process.
diff --git a/src/couch_jobs/rebar.config b/src/couch_jobs/rebar.config
new file mode 100644
index 0000000..362c878
--- /dev/null
+++ b/src/couch_jobs/rebar.config
@@ -0,0 +1,14 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{cover_enabled, true}.
+{cover_print_enabled, true}.
diff --git a/src/couch_jobs/src/couch_jobs.app.src b/src/couch_jobs/src/couch_jobs.app.src
new file mode 100644
index 0000000..8ded14c
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.app.src
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_jobs, [
+    {description, "CouchDB Jobs"},
+    {vsn, git},
+    {mod, {couch_jobs_app, []}},
+    {registered, [
+        couch_jobs_sup,
+        couch_jobs_activity_monitor_sup,
+        couch_jobs_notifier_sup,
+        couch_jobs_server
+    ]},
+    {applications, [
+        kernel,
+        stdlib,
+        erlfdb,
+        couch_log,
+        config,
+        fabric
+    ]}
+]}.
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
new file mode 100644
index 0000000..20cfb5d
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -0,0 +1,300 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs).
+
+-export([
+    add/3,
+    remove/2,
+    stop_and_remove/3,
+    resubmit/2,
+    resubmit/3,
+    get_job/2,
+    get_jobs/1,
+    get_jobs/0,
+
+    accept/1,
+    accept/2,
+    finish/5,
+    resubmit/5,
+    update/5,
+
+    subscribe/2,
+    subscribe/3,
+    unsubscribe/1,
+    wait_job_state/2,
+    wait_job_state/3,
+
+    set_type_timeout/2,
+    clear_type_timeout/1,
+    get_type_timeout/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+%% Job Creation API
+
+-spec add(job_type(), job_id(), job_opts()) -> ok | {error, any()}.
+add(Type, JobId, JobOpts) ->
+    try validate_job_opts(JobOpts) of
+        ok ->
+            couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+                couch_jobs_fdb:add(JTx, Type, JobId, JobOpts)
+            end)
+    catch
+        Tag:Err -> {error, {invalid_args, {Tag, Err}}}
+    end.
+
+
+-spec remove(job_type(), job_id()) -> ok | not_found | canceled.
+remove(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:remove(JTx, Type, JobId)
+    end).
+
+
+-spec stop_and_remove(job_type(), job_id(), timeout()) ->
+    ok | not_found | timeout.
+stop_and_remove(Type, JobId, Timeout) ->
+    case remove(Type, JobId) of
+        not_found ->
+            not_found;
+        ok ->
+            ok;
+        canceled ->
+            case subscribe(Type, JobId) of
+                not_found ->
+                    not_found;
+                finished ->
+                    ok = remove(Type, JobId);
+                {ok, SubId, _JobState} ->
+                    case wait_job_state(SubId, finished, Timeout) of
+                        timeout ->
+                            timeout;
+                        {Type, JobId, finished} ->
+                            ok = remove(Type, JobId)
+                    end
+            end
+    end.
+
+
+-spec resubmit(job_type(), job_id()) -> ok | not_found.
+resubmit(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId, undefined)
+    end).
+
+
+-spec resubmit(job_type(), job_id(), job_priority()) -> ok | not_found.
+resubmit(Type, JobId, NewPriority) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority)
+    end).
+
+
+-spec get_job(job_type(), job_id()) -> {ok, job_opts(), job_state()}
+    | not_found.
+get_job(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_job(JTx, Type, JobId)
+    end).
+
+
+-spec get_jobs(job_type()) -> [{job_id(), job_state(), job_opts()}].
+get_jobs(Type) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_jobs(JTx, Type)
+    end).
+
+
+-spec get_jobs() -> [{job_type(), job_id(), job_state(), job_opts()}].
+get_jobs() ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_jobs(JTx)
+    end).
+
+
+%% Worker Implementation API
+
+-spec accept(job_type()) -> {ok, job_id(), worker_lock()} | not_found.
+accept(Type) ->
+    accept(Type, undefined).
+
+
+-spec accept(job_type(), job_priority()) -> {ok, job_id(), worker_lock()}
+    | not_found.
+accept(Type, MaxPriority) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:accept(JTx, Type, MaxPriority)
+    end).
+
+
+-spec finish(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok |
+    worker_conflict | no_return().
+finish(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:finish(JTx, Type, JobId, JobOpts, WorkerLockId)
+    end).
+
+
+-spec resubmit(jtx(), job_type(), job_id(), job_priority() | undefined,
+    worker_lock()) -> ok | worker_conflict | canceled | no_return().
+resubmit(Tx, Type, JobId, NewPriority, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority, WorkerLockId)
+    end).
+
+
+-spec update(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok |
+    worker_conflict | canceled | no_return().
+update(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:update(JTx, Type, JobId, JobOpts, WorkerLockId)
+    end).
+
+
+%% Subscription API
+
+% Receive events as messages. Wait for them using `wait_job_state/2,3`
+% functions.
+%
+-spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state()}
+    | not_found | {error, any()}.
+subscribe(Type, JobId) ->
+    case couch_jobs_server:get_notifier_server(Type) of
+        {ok, Server} ->
+            case couch_jobs_notifier:subscribe(Server, JobId, self()) of
+                {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+                not_found -> not_found;
+                finished -> finished
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+% Receive events as callbacks. Callback arguments will be:
+%    Fun(SubscriptionRef, Type, JobId, JobState)
+%
+% Returns:
+%   - {ok, SubscriptionRef, CurrentState} where:
+%      - SubscriptionRef is opaque reference for the subscription
+%      - CurrentState is the current job state
+%   - `not_found` if job was not found
+%   - `finished` if job is already in the `finished` state
+%
+-spec subscribe(job_type(), job_id(), job_callback()) -> {ok,
+    job_subscription(), job_state()} | not_found | {error, any()}.
+subscribe(Type, JobId, Fun) when is_function(Fun, 3) ->
+    case couch_jobs_server:get_notifier_server(Type) of
+        {ok, Server} ->
+            case couch_jobs_notifier:subscribe(Server, JobId, Fun, self()) of
+                {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+                not_found -> not_found;
+                finished -> finished
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+% Unsubscribe from getting notifications based on a particular subscription.
+% Each subscription should be followed by its own unsubscription call. However,
+% subscriber processes are also monitored and auto-unsubscribed if they exit.
+% So the subscribing process is exiting, calling this function is optional.
+%
+-spec unsubscribe(job_subscription()) -> ok.
+unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) ->
+    try
+        couch_jobs_notifier:unsubscribe(Server, Ref)
+    after
+        flush_notifications(Ref)
+    end.
+
+
+% Wait to receive job state updates as messages.
+%
+-spec wait_job_state(job_subscription(), timeout()) -> {job_type(), job_id(),
+    job_state()} | timeout.
+wait_job_state({_, Ref}, Timeout) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+            {Type, JobId, JobState}
+    after
+        Timeout -> timeout
+    end.
+
+
+% Wait for a particular job state received as a message.
+%
+-spec wait_job_state(job_subscription(), job_state(), timeout()) ->
+    {job_type(), job_id(), job_state()} | timeout.
+wait_job_state({_, Ref}, JobState, Timeout) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+            {Type, JobId, JobState}
+    after
+        Timeout -> timeout
+    end.
+
+
+%% Job type timeout API
+
+% These functions manipulate the activity timeout for each job type.
+
+-spec set_type_timeout(job_type(), timeout()) -> ok.
+set_type_timeout(Type, Timeout) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:set_type_timeout(JTx, Type, Timeout)
+    end).
+
+
+-spec clear_type_timeout(job_type()) -> ok.
+clear_type_timeout(Type) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:clear_type_timeout(JTx, Type)
+    end).
+
+
+-spec get_type_timeout(job_type()) -> timeout().
+get_type_timeout(Type) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_type_timeout(JTx, Type)
+    end).
+
+
+%% Private utilities
+
+validate_job_opts(#{} = JobOpts) ->
+    jiffy:encode(JobOpts),
+    case maps:get(?OPT_RESUBMIT, JobOpts, undefined) of
+        undefined -> ok;
+        true -> ok;
+        Resubmit -> error({invalid_resubmit, Resubmit, JobOpts})
+    end,
+    case maps:get(?OPT_PRIORITY, JobOpts, undefined) of
+        undefined -> ok;
+        Binary when is_binary(Binary) -> ok;
+        Int when is_integer(Int), Int >= 0 -> ok;
+        Priority -> error({invalid_priority, Priority, JobOpts})
+    end.
+
+
+flush_notifications(Ref) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, _, _, _} ->
+            flush_notifications(Ref)
+    after
+        0 -> ok
+    end.
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
new file mode 100644
index 0000000..dc3e7ff
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -0,0 +1,51 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% JobOpts map/json field definitions
+%
+-define(OPT_PRIORITY, <<"priority">>).
+-define(OPT_DATA, <<"data">>).
+-define(OPT_CANCEL, <<"cancel">>).
+-define(OPT_RESUBMIT, <<"resubmit">>).
+
+% These might be in a fabric public hrl eventually
+%
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+% Data model definitions
+%
+-define(JOBS, 51). % coordinate with fabric2.hrl
+-define(DATA, 1).
+-define(PENDING, 2).
+-define(WATCHES, 3).
+-define(ACTIVITY_TIMEOUT, 4).
+-define(ACTIVITY, 5).
+
+
+% Couch jobs event notifier tag
+-define(COUCH_JOBS_EVENT, '$couch_jobs_event').
+
+
+-type jtx() :: map().
+-type job_id() :: binary().
+-type job_type() :: tuple() | binary() | non_neg_integer().
+-type job_opts() :: map().
+-type job_state() :: running | pending | finished.
+-type job_priority() :: tuple() | binary() | non_neg_integer().
+-type job_subscription() :: {pid(), reference()}.
+-type job_callback() :: fun((job_subscription(), job_type(), job_id(),
+    job_state()) -> ok).
+-type worker_lock() :: binary().
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
new file mode 100644
index 0000000..07ff66a
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -0,0 +1,137 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-record(st, {
+    jtx,
+    type,
+    tref,
+    timeout = 0,
+    vs = not_found
+}).
+
+
+-define(MAX_JITTER_DEFAULT, 15000).
+-define(MISSING_TIMEOUT_CHECK, 5000).
+
+
+start_link(Type) ->
+    gen_server:start_link(?MODULE, [Type], []).
+
+
+%% gen_server callbacks
+
+init([Type]) ->
+    St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+    {ok, schedule_check(St)}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_activity, St) ->
+    St1 = check_activity(St),
+    St2 = schedule_check(St1),
+    {noreply, St2};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+% Private helper functions
+
+check_activity(#st{vs = not_found} = St) ->
+    St#st{vs = get_vs(St)};
+
+check_activity(#st{} = St) ->
+    NewVS = get_vs(St),
+    case get_inactive_since(St) of
+        [] -> ok;
+        [_ | _] -> re_enqueue_inactive(St)
+    end,
+    St#st{vs = NewVS}.
+
+
+get_timeout_msec(JTx, Type) ->
+    TimeoutVal = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_type_timeout(JTx1, Type)
+    end),
+    case TimeoutVal of
+        not_found -> not_found;
+        ValSeconds -> timer:seconds(ValSeconds)
+    end.
+
+
+schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
+    % Reset versionstamp if timeout changed.
+    St1 = case get_timeout_msec(JTx, Type) of
+        not_found -> St#st{vs = not_found, timeout = ?MISSING_TIMEOUT_CHECK};
+        OldTimeout -> St;
+        NewTimeout -> St#st{vs = not_found, timeout = NewTimeout}
+    end,
+    #st{timeout = Timeout} = St1,
+    MaxJitter = max(Timeout div 2, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+    St1#st{tref = erlang:send_after(Wait, self(), check_activity)}.
+
+
+get_vs(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
+re_enqueue_inactive(#st{jtx = JTx, type = Type, vs = VS}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, VS)
+    end).
+
+
+get_inactive_since(#st{jtx = JTx, type = Type, vs = VS}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_inactive_since(JTx1, Type, VS)
+    end).
+
+
+get_max_jitter_msec()->
+    config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
new file mode 100644
index 0000000..b11161a
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_monitor/1,
+    stop_monitor/1,
+    get_child_pids/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_monitor(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_monitor(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+    lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+        Pid
+    end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_monitor,
+            restart => temporary,
+            start => {couch_jobs_activity_monitor, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_app.erl b/src/couch_jobs/src/couch_jobs_app.erl
new file mode 100644
index 0000000..720b948
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_app.erl
@@ -0,0 +1,26 @@
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_Type, []) ->
+    couch_jobs_sup:start_link().
+
+
+stop([]) ->
+    ok.
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
new file mode 100644
index 0000000..0935168
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -0,0 +1,565 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_fdb).
+
+
+-export([
+    add/4,
+    remove/3,
+    resubmit/4,
+    get_job/3,
+    get_jobs/2,
+    get_jobs/1,
+
+    accept/3,
+    finish/5,
+    resubmit/5,
+    update/5,
+
+    set_type_timeout/3,
+    clear_type_timeout/2,
+    get_type_timeout/2,
+    get_types/1,
+
+    get_activity_vs/2,
+    get_activity_vs_and_watch/2,
+    get_active_since/3,
+    get_inactive_since/3,
+    re_enqueue_inactive/3,
+
+    init_cache/0,
+
+    get_jtx/0,
+    get_jtx/1,
+    tx/2,
+
+    has_versionstamp/1,
+
+    clear_jobs/0,
+    clear_type/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Data model
+%
+% (?JOBS, ?DATA, Type, JobId) = (Sequence, WorkerLockId, Priority, JobOpts)
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+% (?JOBS, ?WATCHES, Type) = Sequence
+% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
+% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
+
+% Job creation API
+
+add(#{jtx := true} = JTx0, Type, JobId, JobOpts) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    case get_type_timeout(JTx, Type) of
+        not_found ->
+            {error, no_type_timeout};
+        Int when is_integer(Int) ->
+            Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+            case erlfdb:wait(erlfdb:get(Tx, Key)) of
+                <<_/binary>> -> {error, duplicate_job};
+                not_found -> maybe_enqueue(JTx, Type, JobId, JobOpts, true)
+            end
+    end.
+
+
+remove(#{jtx := true} = JTx0, Type, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, WorkerLockId, _Priority, _} = Job when WorkerLockId =/= null ->
+            ok = cancel(JTx, Key, Job),
+            canceled;
+        {_, _WorkerLockId, Priority, _} when Priority =/= null ->
+            couch_jobs_pending:remove(JTx, Type, Priority, JobId),
+            erlfdb:clear(Tx, Key),
+            ok;
+        {_, _WorkerLockId, _Priority, _} ->
+            erlfdb:clear(Tx, Key),
+            ok;
+        not_found ->
+            not_found
+    end.
+
+
+resubmit(#{jtx := true} = JTx, Type, JobId, NewPriority) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {Seq, WorkerLockId, Priority, #{} = JobOpts} ->
+            JobOpts1 = JobOpts#{?OPT_RESUBMIT => true},
+            JobOpts2 = update_priority(JobOpts1, NewPriority),
+            JobOptsEnc = jiffy:encode(JobOpts2),
+            Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val),
+            ok;
+        not_found ->
+            not_found
+    end.
+
+
+get_job(#{jtx := true} = JTx, Type, JobId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, WorkerLockId, Priority, JobOpts} ->
+            {ok, JobOpts, job_state(WorkerLockId, Priority)};
+        not_found ->
+            not_found
+    end.
+
+
+get_jobs(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?DATA, Type}, Jobs),
+    Opts = [{streaming_mode, want_all}],
+    Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+    lists:map(fun({K, V}) ->
+        {JobId} = erlfdb_tuple:unpack(K, Prefix),
+        {_Seq, WorkerLockId, Priority, JobOpts} = unpack_job(V),
+        JobState = job_state(WorkerLockId, Priority),
+        {JobId, JobState, JobOpts}
+    end, Result).
+
+
+get_jobs(#{jtx := true} = JTx) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?DATA}, Jobs),
+    Opts = [{streaming_mode, want_all}],
+    Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+    lists:map(fun({K, V}) ->
+        {Type, JobId} = erlfdb_tuple:unpack(K, Prefix),
+        {_Seq, WorkerLockId, Priority, JobOpts} = unpack_job(V),
+        JobState = job_state(WorkerLockId, Priority),
+        {Type, JobId, JobState, JobOpts}
+    end, Result).
+
+
+% Worker public API
+
+accept(#{jtx := true} = JTx0, Type, MaxPriority) ->
+    #{jtx := true} = JTx = get_jtx(JTx0),
+    case couch_jobs_pending:dequeue(JTx, Type, MaxPriority) of
+        not_found ->
+            not_found;
+        <<_/binary>> = JobId ->
+            WorkerLockId = fabric2_util:uuid(),
+            update_lock(JTx, Type, JobId, WorkerLockId),
+            update_activity(JTx, Type, JobId, null),
+            {ok, JobId, WorkerLockId}
+    end.
+
+
+finish(#{jtx := true} = JTx0, Type, JobId, JobOpts, WorkerLockId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {Status, {Seq, _, _, JobOptsCur}} when
+                Status =:= ok orelse Status =:= canceled ->
+            % If the job was canceled, allow updating its data one last time
+            JobOpts1 = maps:merge(JobOptsCur, JobOpts),
+            Resubmit = maps:get(?OPT_RESUBMIT, JobOpts1, false) == true,
+            maybe_enqueue(JTx, Type, JobId, JobOpts1, Resubmit),
+            clear_activity(JTx, Type, Seq),
+            update_watch(JTx, Type),
+            ok;
+        {worker_conflict, _} ->
+            worker_conflict
+    end.
+
+
+resubmit(#{jtx := true} = JTx0, Type, JobId, NewPriority, WorkerLockId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {ok, {Seq, WorkerLockId, null, JobOpts}} ->
+            update_activity(JTx, Type, JobId, Seq),
+            JobOpts1 = JobOpts#{?OPT_RESUBMIT => true},
+            JobOpts2 = update_priority(JobOpts1, NewPriority),
+            update_job(JTx, Type, JobId, WorkerLockId, JobOpts2, Seq),
+            ok;
+        {Status, _} when Status =/= ok ->
+            Status
+    end.
+
+
+update(#{jtx := true} = JTx, Type, JobId, JobOpts, WorkerLockId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {ok, {Seq, WorkerLockId, null, JobOptsCur}} ->
+            JobOpts1 = maps:merge(JobOptsCur, JobOpts),
+            update_job(JTx, Type, JobId, WorkerLockId, JobOpts1, Seq),
+            ok;
+        {Status, _} when Status =/= ok ->
+            Status
+    end.
+
+
+% Type and activity monitoring API
+
+set_type_timeout(#{jtx := true} = JTx, Type, Timeout) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    Val = erlfdb_tuple:pack({Timeout}),
+    erlfdb:set(Tx, Key, Val).
+
+
+clear_type_timeout(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+get_type_timeout(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    case erlfdb:wait(erlfdb:get_ss(Tx, Key)) of
+        not_found ->
+            not_found;
+        Val ->
+            {Timeout} = erlfdb_tuple:unpack(Val),
+            Timeout
+    end.
+
+
+get_types(#{jtx := true} = JTx) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
+    Opts = [{streaming_mode, want_all}],
+    Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+    lists:map(fun({K, _V}) ->
+        {Type} = erlfdb_tuple:unpack(K, Prefix),
+        Type
+    end, Result).
+
+
+get_activity_vs(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        not_found ->
+            not_found;
+        Val ->
+            {VS} = erlfdb_tuple:unpack(Val),
+            VS
+    end.
+
+
+get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    Future = erlfdb:get(Tx, Key),
+    Watch = erlfdb:watch(Tx, Key),
+    case erlfdb:wait(Future) of
+        not_found ->
+            {not_found, Watch};
+        Val ->
+            {VS} = erlfdb_tuple:unpack(Val),
+            {VS, Watch}
+    end.
+
+
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+    StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    StartKeySel = erlfdb_key:first_greater_than(StartKey),
+    {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
+    lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+    {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
+    EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    EndKeySel = erlfdb_key:first_greater_than(EndKey),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
+    lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+re_enqueue_inactive(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    JobIds = get_inactive_since(JTx, Type, Versionstamp),
+    lists:foreach(fun(JobId) ->
+        Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+        {Seq, _, _, JobOpts} = get_job(Tx, Key),
+        clear_activity(JTx, Type, Seq),
+        maybe_enqueue(JTx, Type, JobId, JobOpts, true)
+    end, JobIds),
+    case length(JobIds) > 0 of
+        true -> update_watch(JTx, Type);
+        false -> ok
+    end,
+    JobIds.
+
+
+% Cache initialization API. Called from the supervisor just to create the ETS
+% table. It returns `ignore` to tell supervisor it won't actually start any
+% process, which is what we want here.
+%
+init_cache() ->
+    ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
+    ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
+    ignore.
+
+
+% Cached job transaction object. This object wraps a transaction, caches the
+% directory lookup path, and the metadata version. The function can be used
+% from or outside the transaction. When used from a transaction it will verify
+% if the metadata was changed, and will refresh automatically.
+%
+get_jtx() ->
+    get_jtx(undefined).
+
+
+get_jtx(#{tx := Tx} = _TxDb) ->
+    get_jtx(Tx);
+
+get_jtx(undefined = _Tx) ->
+    case ets:lookup(?MODULE, ?JOBS) of
+        [{_, #{} = JTx}] ->
+            JTx;
+        [] ->
+            JTx = update_jtx_cache(init_jtx(undefined)),
+            JTx#{tx := undefined}
+    end;
+
+get_jtx({erlfdb_transaction, _} = Tx) ->
+    case ets:lookup(?MODULE, ?JOBS) of
+        [{_, #{} = JTx}] ->
+            ensure_current(JTx#{tx := Tx});
+        [] ->
+            update_jtx_cache(init_jtx(Tx))
+    end.
+
+
+% Transaction processing to be used with couch jobs' specific transaction
+% contexts
+%
+tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
+    fabric2_fdb:transactional(JTx, Fun).
+
+
+% Utility fdb functions used by other module in couch_job. Maybe move these to
+% a separate module if the list keep growing
+
+has_versionstamp(?UNSET_VS) ->
+    true;
+
+has_versionstamp(Tuple) when is_tuple(Tuple) ->
+    has_versionstamp(tuple_to_list(Tuple));
+
+has_versionstamp([Elem | Rest]) ->
+    has_versionstamp(Elem) orelse has_versionstamp(Rest);
+
+has_versionstamp(_Other) ->
+    false.
+
+
+% Debug and testing API
+
+clear_jobs() ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        #{jobs_path := Jobs} = init_jtx(Tx),
+        erlfdb:clear_range_startswith(Tx, Jobs)
+    end).
+
+
+clear_type(Type) ->
+    Sections = [?DATA, ?PENDING, ?WATCHES, ?ACTIVITY_TIMEOUT, ?ACTIVITY],
+    fabric2_fdb:transactional(fun(Tx) ->
+        #{jobs_path := Jobs} = init_jtx(Tx),
+        lists:foreach(fun(Section) ->
+            Prefix = erlfdb_tuple:pack({Section, Type}, Jobs),
+            erlfdb:clear_range_startswith(Tx, Prefix)
+        end, Sections)
+    end).
+
+
+% Private helper functions
+
+update_job(JTx, Type, JobId, WorkerLockId, JobOpts, OldSeq) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    update_activity(JTx, Type, JobId, OldSeq),
+    ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)},
+    Val = erlfdb_tuple:pack_vs(ValTup),
+    erlfdb:set_versionstamped_value(Tx, Key, Val).
+
+
+update_priority(JobOpts, undefined) ->
+    JobOpts;
+
+update_priority(JobOpts, NewPriority) ->
+    OldPriority = maps:get(?OPT_PRIORITY, JobOpts, undefined),
+    case NewPriority =/= OldPriority of
+        true -> JobOpts#{?OPT_PRIORITY => NewPriority};
+        false -> JobOpts
+    end.
+
+
+cancel(#{jx := true}, _, {_, _, _, #{?OPT_CANCEL := true}}) ->
+    ok;
+
+cancel(#{jtx := true, tx := Tx}, Key, Job) ->
+    {Seq, WorkerLockId, Priority, JobOpts} = Job,
+    JobOpts1 = JobOpts#{?OPT_CANCEL => true},
+    JobOptsEnc = jiffy:encode(JobOpts1),
+    Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+    erlfdb:set(Tx, Key, Val),
+    ok.
+
+
+maybe_enqueue(#{jtx := true} = JTx, Type, JobId, JobOpts, Resubmit) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    Cancel = maps:get(?OPT_CANCEL, JobOpts, false) == true,
+    JobOpts1 = maps:without([?OPT_RESUBMIT], JobOpts),
+    Priority = maps:get(?OPT_PRIORITY, JobOpts1, ?UNSET_VS),
+    JobOptsEnc = jiffy:encode(JobOpts1),
+    case Resubmit andalso not Cancel of
+        true ->
+            case has_versionstamp(Priority) of
+                true ->
+                    Val = erlfdb_tuple:pack_vs({null, null, Priority,
+                        JobOptsEnc}),
+                    erlfdb:set_versionstamped_value(Tx, Key, Val);
+                false ->
+                    Val = erlfdb_tuple:pack({null, null, Priority,
+                        JobOptsEnc}),
+                    erlfdb:set(Tx, Key, Val)
+            end,
+            couch_jobs_pending:enqueue(JTx, Type, Priority, JobId);
+        false ->
+            Val = erlfdb_tuple:pack({null, null, null, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val)
+    end,
+    ok.
+
+
+get_job(Tx = {erlfdb_transaction, _}, Key) ->
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        <<_/binary>> = Val -> unpack_job(Val);
+        not_found -> not_found
+    end.
+
+
+unpack_job(<<_/binary>> = JobVal) ->
+    {Seq, WorkerLockId, Priority, JobOptsEnc} = erlfdb_tuple:unpack(JobVal),
+    JobOpts = jiffy:decode(JobOptsEnc, [return_maps]),
+    {Seq, WorkerLockId, Priority, JobOpts}.
+
+
+get_job_and_status(Tx, Key, WorkerLockId) ->
+    case get_job(Tx, Key) of
+        {_, LockId, _, _} = Res when WorkerLockId =/= LockId ->
+            {worker_conflict, Res};
+        {_, _, _, #{?OPT_CANCEL := true}} = Res ->
+            {canceled, Res};
+        {_, _, _, #{}} = Res ->
+            {ok, Res};
+        not_found ->
+            {worker_conflict, not_found}
+    end.
+
+
+update_activity(#{jtx := true} = JTx, Type, JobId, Seq) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    case Seq =/= null of
+        true -> clear_activity(JTx, Type, Seq);
+        false -> ok
+    end,
+    Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
+    erlfdb:set_versionstamped_key(Tx, Key, JobId),
+    update_watch(JTx, Type).
+
+
+clear_activity(#{jtx := true} = JTx, Type, Seq) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+update_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    Val = erlfdb_tuple:pack_vs({?UNSET_VS}),
+    erlfdb:set_versionstamped_value(Tx, Key, Val).
+
+
+update_lock(#{jtx := true} = JTx, Type, JobId, WorkerLockId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    {null, null, _, JobOpts} = get_job(Tx, Key),
+    ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)},
+    Val = erlfdb_tuple:pack_vs(ValTup),
+    erlfdb:set_versionstamped_value(Tx, Key, Val).
+
+
+job_state(WorkerLockId, Priority) ->
+    case {WorkerLockId, Priority} of
+        {null, null} ->
+            finished;
+        {WorkerLockId, _} when WorkerLockId =/= null ->
+            running;
+        {_, Priority} when Priority =/= null ->
+            pending
+    end.
+
+
+% This a transaction context object similar to the Db = #{} one from
+% fabric2_fdb. It's is used to cache the jobs path directory (to avoid extra
+% lookups on every operation) and to check for metadata changes (in case
+% directory changes).
+%
+init_jtx(undefined) ->
+    fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
+
+init_jtx({erlfdb_transaction, _} = Tx) ->
+    Root = erlfdb_directory:root(),
+    CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+    LayerPrefix = erlfdb_directory:get_name(CouchDB),
+    Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
+    Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+    % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
+    % but we also assert that this is a job transaction using the jtx => true
+    % field
+    #{
+        jtx => true,
+        tx => Tx,
+        layer_prefix => LayerPrefix,
+        jobs_path => Jobs,
+        md_version => Version
+    }.
+
+
+ensure_current(#{jtx := true, tx := Tx, md_version := Version} = JTx) ->
+    case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+        Version -> JTx;
+        _NewVersion -> update_jtx_cache(init_jtx(Tx))
+    end.
+
+
+update_jtx_cache(#{jtx := true} = JTx) ->
+    CachedJTx = JTx#{tx := undefined},
+    ets:insert(?MODULE, {?JOBS, CachedJTx}),
+    JTx.
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
new file mode 100644
index 0000000..59fa31a
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -0,0 +1,218 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1,
+    subscribe/3,
+    subscribe/4,
+    unsubscribe/2
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, 1000).
+-define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity").
+
+
+-record(st, {
+    jtx,
+    type,
+    monitor_pid,
+    subs
+}).
+
+
+start_link(Type) ->
+    gen_server:start_link(?MODULE, [Type], []).
+
+
+subscribe(Server, JobId, Pid) when is_pid(Pid) ->
+    gen_server:call(Server, {subscribe, JobId, nil, Pid}, infinity).
+
+
+subscribe(Server, JobId, Fun, Pid) when is_function(Fun, 3), is_pid(Pid) ->
+    gen_server:call(Server, {subscribe, JobId, Fun, Pid}, infinity).
+
+
+unsubscribe(Server, Ref) when is_reference(Ref) ->
+    gen_server:call(Server, {unsubscribe, Ref}, infinity).
+
+
+init([Type]) ->
+    JTx = couch_jobs_fdb:get_jtx(),
+    EtsOpts = [ordered_set, protected],
+    St = #st{jtx = JTx, type = Type, subs = ets:new(?MODULE, EtsOpts)},
+    VS = get_type_vs(St),
+    HoldOff = get_holdoff(),
+    Timeout = get_timeout(),
+    Pid = couch_jobs_type_monitor:start(Type, VS, HoldOff, Timeout),
+    {ok, St#st{monitor_pid = Pid}}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call({subscribe, JobId, Fun, Pid}, _From, #st{} = St) ->
+    Res = case get_job(St, JobId) of
+        not_found ->
+            not_found;
+        {ok, _, finished} ->
+            finished;
+        {ok, _, JobState} ->
+            Ref = erlang:monitor(process, Pid),
+            ets:insert(St#st.subs, {{JobId, Ref}, {Fun, Pid, JobState}}),
+            {Ref, JobState}
+    end,
+    {reply, Res, St};
+
+handle_call({unsubscribe, Ref}, _From, #st{subs = Subs} = St) ->
+    true = ets:match_delete(Subs, {{'$1', Ref}, '_'}),
+    {reply, ok, St};
+
+% couch_jobs_type_monitor calls this
+handle_call({type_updated, VS}, _From, St) ->
+    ok = notify_subscribers(VS, St),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _, _}, #st{subs = Subs} = St) ->
+    true = ets:match_delete(Subs, {{'$1', Ref}, '_'}),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+get_job(#st{jtx = JTx, type = Type}, JobId) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_job(JTx1, Type, JobId)
+    end).
+
+
+get_jobs(#st{jtx = JTx, type = Type}, JobIds) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        lists:map(fun(JobId) ->
+            case couch_jobs_fdb:get_job(JTx1, Type, JobId) of
+                {ok, _, JobState} -> {JobId, JobState};
+                not_found -> {JobId, not_found}
+            end
+        end, JobIds)
+    end).
+
+
+get_type_vs(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
+% "Active since" is the list of jobs that have been active (running)
+% and updated at least once since the given versionstamp. These are relatively
+% cheap to find as it's just a range read in the ?ACTIVITY subspace.
+%
+get_active_since(#st{jtx = JTx, type = Type}, VS, SubscribedJobs) ->
+    AllUpdatedSet = sets:from_list(couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_active_since(JTx1, Type, VS)
+    end)),
+    SubscribedSet = sets:from_list(SubscribedJobs),
+    SubscribedActiveSet = sets:intersection(AllUpdatedSet, SubscribedSet),
+    sets:to_list(SubscribedActiveSet).
+
+
+get_subscribers(JobId, #st{subs = Subs}) ->
+    % Use ordered ets's fast matching of partial key prefixes here
+    lists:map(fun([Ref, {Fun, Pid, JobState}]) ->
+        {Ref, Fun, Pid, JobState}
+    end, ets:match(Subs, {{JobId, '$1'}, '$2'})).
+
+
+get_subscribed_job_ids(#st{subs = Subs}) ->
+    Matches = ets:match(Subs, {{'$1', '_'}, '_'}),
+    lists:usort(lists:flatten(Matches)).
+
+
+notify_subscribers(VS, #st{subs = Subs} = St) ->
+    JobIds = get_subscribed_job_ids(St),
+    % First gather the easy (cheap) active jobs. Then with those out of way,
+    % inspect each job to get its state.
+    Active = get_active_since(St, VS, JobIds),
+    JobStates = [{JobId, running} || JobId <- Active],
+    JobStates1 = JobStates ++ get_jobs(St, JobIds -- Active),
+    lists:foreach(fun({JobId, JobState}) ->
+        lists:foreach(fun
+            ({Ref, Fun, Pid, running}) when JobState =:= running ->
+                notify(JobId, Ref, Fun, Pid, JobState, St);
+            ({_Ref, _Fun, _Pid, State}) when State =:= JobState ->
+                ok;
+            ({Ref, Fun, Pid, _}) ->
+                notify(JobId, Ref, Fun, Pid, JobState, St),
+                ets:insert(Subs, {{JobId, Ref}, {Fun, Pid, JobState}})
+        end, get_subscribers(JobId, St)),
+        case lists:member(JobState, [finished, not_found]) of
+            true -> ets:match_delete(Subs, {{JobId, '_'}, '_'});
+            false -> ok
+        end
+    end, JobStates1).
+
+
+notify(JobId, _Ref, Fun, _, JobState, St) when is_function(Fun, 3) ->
+    try
+        Fun(St#st.type, JobId, JobState)
+    catch
+        Tag:Err ->
+            ErrMsg = "~p : callback ~p failed ~p ~p =>  ~p:~p",
+            couch_log:error(ErrMsg, [?MODULE, Fun, JobId, JobState, Tag, Err])
+    end;
+
+notify(JobId, Ref, _, Pid, JobState, St) ->
+    Pid ! {?COUCH_JOBS_EVENT, Ref, St#st.type, JobId, JobState}.
+
+
+get_holdoff() ->
+    config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+        ?TYPE_MONITOR_HOLDOFF_DEFAULT).
+
+
+get_timeout() ->
+    Default =  ?TYPE_MONITOR_TIMEOUT_DEFAULT,
+    case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of
+        "infinity" -> infinity;
+        Milliseconds -> list_to_integer(Milliseconds)
+    end.
diff --git a/src/couch_jobs/src/couch_jobs_notifier_sup.erl b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
new file mode 100644
index 0000000..81d9349
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_notifier/1,
+    stop_notifier/1,
+    get_child_pids/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_notifier(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_notifier(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+    lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+        Pid
+    end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_notifier,
+            restart => temporary,
+            start => {couch_jobs_notifier, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_pending.erl b/src/couch_jobs/src/couch_jobs_pending.erl
new file mode 100644
index 0000000..cedf53e
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_pending.erl
@@ -0,0 +1,159 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_pending).
+
+
+-export([
+    enqueue/4,
+    dequeue/3,
+    remove/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Make this configurable or auto-adjustable based on retries
+%
+-define(RANGE_LIMIT, 256).
+
+
+% Data model
+%
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+
+
+% Public API
+
+% Enqueue a job into the pending queue. Priority determines the place in the
+% queue.
+%
+enqueue(#{jtx := true} = JTx, Type, Priority, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    KeyTup = {?PENDING, Type, Priority, JobId},
+    case couch_jobs_fdb:has_versionstamp(Priority) of
+        true ->
+            Key = erlfdb_tuple:pack_vs(KeyTup, Jobs),
+            erlfdb:set_versionstamped_key(Tx, Key, <<>>);
+        false ->
+            Key = erlfdb_tuple:pack(KeyTup, Jobs),
+            erlfdb:set(Tx, Key, <<>>)
+    end.
+
+
+% Dequeue a job from the front of the queue.
+%
+% If MaxPriority is specified, any job between the front of the queue and up
+% until MaxPriority is considered. Workers may randomly pick jobs in that
+% range. That can be used to avoid contention at the expense of strict dequeue
+% ordering. For instance if priorities are 0-high, 1-normal and 2-low, and we
+% wish to process normal and urgent jobs, then MaxPriority=1-normal would
+% accomplish that.
+%
+dequeue(#{jtx := true} = JTx, Type, undefined) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+    case get_front_priority(Tx, Prefix) of
+        not_found ->
+            not_found;
+        {{versionstamp, _, _}, PendingKey} ->
+            erlfdb:clear(Tx, PendingKey),
+            {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+            JobId;
+        {{versionstamp, _, _, _}, PendingKey} ->
+            erlfdb:clear(Tx, PendingKey),
+            {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+            JobId;
+        {Priority, _} ->
+            {Start, End} = erlfdb_tuple:range({Priority}, Prefix),
+            case clear_random_key_from_range(Tx, Start, End) of
+                not_found ->
+                    not_found;
+                <<_/binary>> = PendingKey ->
+                    {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+                    JobId
+            end
+    end;
+
+% If MaxPriority is not specified, only jobs with the same priority as the item
+% at the front of the queue are considered. Two extremes are useful to
+% consider:
+%
+%  * Priority is just one static value (say null, or "normal"). In that case,
+% the queue is effectively a bag of tasks that can be grabbed in any order,
+% which should minimize contention.
+%
+%  * Each job has a unique priority value, for example a versionstamp. In that
+%  case, the queue has strict FIFO behavior, but there will be more contention
+%  at the front of the queue.
+%
+dequeue(#{jtx := true} = JTx, Type, MaxPriority) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+    StartKeySel = erlfdb_key:first_greater_than(Prefix),
+    End = erlfdb_tuple:pack({MaxPriority, <<16#FF>>}, Prefix),
+    EndKeySel = erlfdb_key:first_greater_or_equal(End),
+    case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of
+        not_found ->
+            not_found;
+        <<_/binary>> = PendingKey ->
+            {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+            JobId
+    end.
+
+
+% Remove a job from the pending queue. This is used, for example, when a job is
+% canceled while it was waiting in the pending queue.
+%
+remove(#{jtx := true} = JTx, Type, Priority, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?PENDING, Type, Priority, JobId}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+% Private functions
+
+% The priority of the item at the front. If there are multiple
+% items with the same priority, workers can randomly pick between them to
+% avoid contention.
+%
+get_front_priority(Tx, Prefix) ->
+    Opts = [{limit, 1}, {snapshot, true}],
+    case erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)) of
+        [] ->
+            not_found;
+        [{FrontKey, _}] ->
+            {Priority, _} = erlfdb_tuple:unpack(FrontKey, Prefix),
+            {Priority, FrontKey}
+    end.
+
+
+% Pick a random key from the range snapshot. Then radomly pick a key to
+% clear. Before clearing, ensure there is a read conflict on the key in
+% in case other workers have picked the same key.
+%
+clear_random_key_from_range(Tx, Start, End) ->
+    Opts = [
+        {limit, ?RANGE_LIMIT},
+        {snapshot, true}
+    ],
+    case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of
+        [] ->
+            not_found;
+        [{_, _} | _] = KVs ->
+            Index = rand:uniform(length(KVs)),
+            {Key, _} = lists:nth(Index, KVs),
+            erlfdb:add_read_conflict_key(Tx, Key),
+            erlfdb:clear(Tx, Key),
+            Key
+    end.
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
new file mode 100644
index 0000000..ad30435
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -0,0 +1,184 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_server).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0,
+    get_notifier_server/1,
+    force_check_types/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(TYPE_CHECK_PERIOD_DEFAULT, 15000).
+-define(MAX_JITTER_DEFAULT, 5000).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+get_notifier_server(Type) ->
+    case get_type_pid_refs(Type) of
+        {{_, _}, {NotifierPid, _}} -> {ok, NotifierPid};
+        not_found -> {error, not_found}
+    end.
+
+
+force_check_types() ->
+    gen_server:call(?MODULE, check_types, infinity).
+
+
+init(_) ->
+    % If couch_jobs_server is after the notifiers and activity supervisor. If
+    % it restart, there could be some stale notifier or activity monitors. Kill
+    % those as later on we'd start new ones anyway.
+    reset_monitors(),
+    reset_notifiers(),
+    ets:new(?MODULE, [protected, named_table]),
+    schedule_check(),
+    {ok, nil}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(check_types, _From, St) ->
+    check_types(),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_types, St) ->
+    check_types(),
+    schedule_check(),
+    {noreply, St};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, St) ->
+    LogMsg = "~p : process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {stop, {unexpected_process_exit, Pid, Reason}, St};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+    % Don't crash out couch_jobs_server and the whole application would need to
+    % eventually do proper cleanup in erlfdb:wait timeout code.
+    LogMsg = "~p : spurious erlfdb future ready message ~p",
+    couch_log:error(LogMsg, [?MODULE, Ref]),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+check_types() ->
+    FdbTypes = fdb_types(),
+    EtsTypes = ets_types(),
+    ToStart = FdbTypes -- EtsTypes,
+    ToStop = EtsTypes -- FdbTypes,
+    lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart),
+    lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop).
+
+
+start_monitors(Type) ->
+    MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of
+        {ok, Pid1} -> {Pid1, monitor(process, Pid1)};
+        {error, Error1} -> error({failed_to_start_monitor, Type, Error1})
+    end,
+    NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of
+        {ok, Pid2} -> {Pid2, monitor(process, Pid2)};
+        {error, Error2} -> error({failed_to_start_notifier, Type, Error2})
+    end,
+    ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}).
+
+
+stop_monitors(Type) ->
+    {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type),
+    ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid),
+    demonitor(MonRef, [flush]),
+    ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid),
+    demonitor(NotifierRef, [flush]),
+    ets:delete(?MODULE, Type).
+
+
+reset_monitors() ->
+    lists:foreach(fun(Pid) ->
+        couch_jobs_activity_monitor_sup:stop_monitor(Pid)
+    end, couch_jobs_activity_monitor_sup:get_child_pids()).
+
+
+reset_notifiers() ->
+    lists:foreach(fun(Pid) ->
+        couch_jobs_notifier_sup:stop_notifier(Pid)
+    end, couch_jobs_notifier_sup:get_child_pids()).
+
+
+get_type_pid_refs(Type) ->
+    case ets:lookup(?MODULE, Type) of
+        [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef};
+        [] -> not_found
+    end.
+
+
+ets_types() ->
+    lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})).
+
+
+fdb_types() ->
+    try
+        couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+            couch_jobs_fdb:get_types(JTx)
+        end)
+    catch
+        error:{timeout, _} ->
+            couch_log:warning("~p : Timed out connecting to FDB", [?MODULE]),
+            []
+    end.
+
+
+schedule_check() ->
+    Timeout = get_period_msec(),
+    MaxJitter = max(Timeout div 2, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+    erlang:send_after(Wait, self(), check_types).
+
+
+get_period_msec() ->
+    config:get_integer("couch_jobs", "type_check_period_msec",
+        ?TYPE_CHECK_PERIOD_DEFAULT).
+
+
+get_max_jitter_msec() ->
+    config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_sup.erl b/src/couch_jobs/src/couch_jobs_sup.erl
new file mode 100644
index 0000000..d790237
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_sup.erl
@@ -0,0 +1,66 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Flags = #{
+        strategy => rest_for_one,
+        intensity => 3,
+        period => 10
+    },
+    Children = [
+        #{
+            id => couch_jobs_fdb,
+            restart => transient,
+            start => {couch_jobs_fdb, init_cache, []}
+        },
+        #{
+            id => couch_jobs_activity_monitor_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_activity_monitor_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_notifier_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_notifier_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_server,
+            restart => permanent,
+            shutdown => brutal_kill,
+            start => {couch_jobs_server, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl
new file mode 100644
index 0000000..b7ba633
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl
@@ -0,0 +1,78 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_type_monitor).
+
+
+-export([
+    start/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(st, {
+    jtx,
+    type,
+    vs,
+    parent,
+    timestamp,
+    holdoff,
+    timeout
+}).
+
+
+start(Type, VS, HoldOff, Timeout) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        loop(#st{
+            jtx = couch_jobs_fdb:get_jtx(),
+            type = Type,
+            vs = VS,
+            parent = Parent,
+            timestamp = 0,
+            holdoff = HoldOff,
+            timeout = Timeout
+        })
+    end).
+
+
+loop(#st{vs = VS, timeout = Timeout} = St) ->
+    {St1, Watch} = case get_vs_and_watch(St) of
+        {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W};
+        {VS, W} -> {St, W}
+    end,
+    try
+        erlfdb:wait(Watch, [{timeout, Timeout}])
+    catch
+        error:{timeout, _} ->
+            ok
+    end,
+    loop(St1).
+
+
+notify(#st{} = St) ->
+    #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St,
+    Now = erlang:system_time(millisecond),
+    case Now - Ts of
+        Dt when Dt < HoldOff -> timer:sleep(min(HoldOff - Dt, 0));
+        _ -> ok
+    end,
+    gen_server:call(Pid, {type_updated, VS}, infinity),
+    St#st{timestamp = Now}.
+
+
+get_vs_and_watch(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
+    end).
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
new file mode 100644
index 0000000..fe3df56
--- /dev/null
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -0,0 +1,597 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_tests).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+-define(DATA, <<"data">>).
+-define(RESUBMIT, <<"resubmit">>).
+-define(PRIORITY, <<"priority">>).
+
+
+couch_jobs_basic_test_() ->
+    {
+        "Test couch jobs basics",
+        {
+            setup,
+            fun setup_couch/0, fun teardown_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun add_remove_pending/1,
+                    fun add_remove_errors/1,
+                    fun get_jobs/1,
+                    fun resubmit_as_job_creator/1,
+                    fun type_timeouts_and_server/1,
+                    fun dead_notifier_restarts_jobs_server/1,
+                    fun bad_messages_restart_couch_jobs_server/1,
+                    fun bad_messages_restart_notifier/1,
+                    fun bad_messages_restart_activity_monitor/1,
+                    fun worker_accept_and_finish/1,
+                    fun worker_update/1,
+                    fun resubmit_enqueues_job/1,
+                    fun add_custom_priority/1,
+                    fun resubmit_custom_priority/1,
+                    fun accept_max_priority/1,
+                    fun subscribe/1,
+                    fun subscribe_callback/1,
+                    fun subscribe_errors/1,
+                    fun enqueue_inactive/1,
+                    fun cancel_running_job/1,
+                    fun stop_and_remove_running_job/1,
+                    fun clear_type_works/1
+                ]
+            }
+        }
+    }.
+
+
+setup_couch() ->
+    test_util:start_couch([fabric]).
+
+
+teardown_couch(Ctx) ->
+    test_util:stop_couch(Ctx),
+    meck:unload().
+
+
+setup() ->
+    couch_jobs_fdb:clear_jobs(),
+    application:start(couch_jobs),
+    T1 = <<"t1">>,
+    T2 = 424242,  % a number should work as well
+    T1Timeout = 2,
+    T2Timeout = 3,
+    couch_jobs:set_type_timeout(T1, T1Timeout),
+    couch_jobs:set_type_timeout(T2, T2Timeout),
+    couch_jobs_server:force_check_types(),
+    #{
+        t1 => T1,
+        t2 => T2,
+        t1_timeout => T1Timeout,
+        t2_timeout => T2Timeout,
+        j1 => <<"j1">>,
+        j2 => <<"j2">>,
+        j1_data => #{<<"j1_data">> => 42}
+    }.
+
+
+teardown(#{}) ->
+    application:stop(couch_jobs),
+    couch_jobs_fdb:clear_jobs(),
+    meck:unload().
+
+
+restart_app() ->
+    application:stop(couch_jobs),
+    application:start(couch_jobs),
+    couch_jobs_server:force_check_types().
+
+
+add_remove_pending(#{t1 := T, j1 := J, j1_data := Data}) ->
+    ?_test(begin
+        ?assertEqual(ok, couch_jobs:add(T, J, #{?DATA => Data})),
+        ?assertMatch({ok, #{?DATA := Data}, pending},
+            couch_jobs:get_job(T, J)),
+        ?assertEqual(ok, couch_jobs:remove(T, J)),
+        ?assertEqual(ok, couch_jobs:add(T, J, #{?DATA => Data})),
+        ?assertMatch({ok, #{?DATA := Data}, pending},
+            couch_jobs:get_job(T, J)),
+        ?assertEqual(ok, couch_jobs:remove(T, J))
+    end).
+
+
+add_remove_errors(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ?assertEqual(not_found, couch_jobs:remove(<<"bad_type">>, <<"1">>)),
+        ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J,
+            #{1 => 2})),
+        ?assertEqual({error, no_type_timeout}, couch_jobs:add(<<"x">>, J,
+            #{})),
+        ?assertEqual(ok, couch_jobs:add(T, J, #{})),
+        ?assertEqual({error, duplicate_job}, couch_jobs:add(T, J, #{})),
+        ?assertEqual(ok, couch_jobs:remove(T, J)),
+        ?assertEqual(not_found, couch_jobs:stop_and_remove(T, J, 100)),
+        ?assertEqual(not_found, couch_jobs:remove(T, J)),
+        ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J,
+            #{?RESUBMIT => potato})),
+        ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J,
+            #{?PRIORITY => #{bad_priority => nope}}))
+
+    end).
+
+
+get_jobs(#{t1 := T1, t2 := T2, j1 := J1, j2 := J2, j1_data := Data}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T1, J1, #{?DATA => Data}),
+        ok = couch_jobs:add(T2, J2, #{}),
+
+        ?assertMatch({ok, #{?DATA := Data}, pending},
+            couch_jobs:get_job(T1, J1)),
+        ?assertEqual({ok, #{}, pending}, couch_jobs:get_job(T2, J2)),
+
+        ?assertEqual([{J1, pending, #{?DATA => Data}}],
+            couch_jobs:get_jobs(T1)),
+        ?assertEqual([{J2, pending, #{}}], couch_jobs:get_jobs(T2)),
+        ?assertEqual([], couch_jobs:get_jobs(<<"othertype">>)),
+
+        ?assertEqual(lists:sort([
+            {T1, J1, pending, #{?DATA => Data}},
+            {T2, J2, pending, #{}}
+        ]), lists:sort(couch_jobs:get_jobs())),
+        ?assertEqual(ok, couch_jobs:remove(T1, J1)),
+        ?assertEqual([{T2, J2, pending, #{}}], couch_jobs:get_jobs()),
+        ?assertEqual(ok, couch_jobs:remove(T2, J2)),
+        ?assertEqual([], couch_jobs:get_jobs())
+    end).
+
+
+resubmit_as_job_creator(#{t1 := T, j1 := J, j1_data := Data}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{?DATA => Data}),
+
+        ?assertEqual(not_found, couch_jobs:resubmit(T, <<"badjob">>)),
+
+        ?assertEqual(ok, couch_jobs:resubmit(T, J)),
+        JobOpts1 = #{?DATA => Data, ?RESUBMIT => true},
+        ?assertEqual({ok, JobOpts1, pending}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, couch_jobs:resubmit(T, J)),
+        ?assertEqual({ok, JobOpts1, pending}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, couch_jobs:resubmit(T, J, <<"a">>)),
+        JobOpts2 = #{?DATA => Data, ?RESUBMIT => true, ?PRIORITY => <<"a">>},
+        ?assertEqual({ok, JobOpts2, pending}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, couch_jobs:resubmit(T, J, <<"b">>)),
+        JobOpts3 = #{?DATA => Data, ?RESUBMIT => true, ?PRIORITY => <<"b">>},
+        ?assertEqual({ok, JobOpts3, pending}, couch_jobs:get_job(T, J))
+    end).
+
+
+type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) ->
+    ?_test(begin
+        ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)),
+
+        ?assertEqual(2,
+            length(couch_jobs_activity_monitor_sup:get_child_pids())),
+        ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())),
+        ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)),
+
+        ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)),
+        couch_jobs_server:force_check_types(),
+        ?assertEqual(3,
+            length(couch_jobs_activity_monitor_sup:get_child_pids())),
+        ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())),
+
+        ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)),
+        couch_jobs_server:force_check_types(),
+        ?assertEqual(2,
+            length(couch_jobs_activity_monitor_sup:get_child_pids())),
+        ?assertEqual(2,
+            length(couch_jobs_notifier_sup:get_child_pids())),
+        ?assertMatch({error, _},
+            couch_jobs_server:get_notifier_server(<<"t3">>)),
+
+        ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>))
+    end).
+
+
+dead_notifier_restarts_jobs_server(#{}) ->
+    ?_test(begin
+        ServerPid = whereis(couch_jobs_server),
+        Ref = monitor(process, ServerPid),
+
+        [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(),
+        exit(Notifier1, kill),
+
+        % Killing a notifier should kill the server as well
+        receive {'DOWN', Ref, _, _, _} -> ok end
+    end).
+
+
+bad_messages_restart_couch_jobs_server(#{}) ->
+    ?_test(begin
+        % couch_jobs_server dies on bad cast
+        ServerPid1 = whereis(couch_jobs_server),
+        Ref1 = monitor(process, ServerPid1),
+        gen_server:cast(ServerPid1, bad_cast),
+        receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % couch_jobs_server dies on bad call
+        ServerPid2 = whereis(couch_jobs_server),
+        Ref2 = monitor(process, ServerPid2),
+        catch gen_server:call(ServerPid2, bad_call),
+        receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % couch_jobs_server dies on bad info
+        ServerPid3 = whereis(couch_jobs_server),
+        Ref3 = monitor(process, ServerPid3),
+        ServerPid3 ! a_random_message,
+        receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+        restart_app()
+    end).
+
+
+bad_messages_restart_notifier(#{}) ->
+    ?_test(begin
+        % bad cast kills the activity monitor
+        [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(),
+        Ref1 = monitor(process, AMon1),
+        gen_server:cast(AMon1, bad_cast),
+        receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad calls restart activity monitor
+        [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(),
+        Ref2 = monitor(process, AMon2),
+        catch gen_server:call(AMon2, bad_call),
+        receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad info message kills activity monitor
+        [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(),
+        Ref3 = monitor(process, AMon3),
+        AMon3 ! a_bad_message,
+        receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+
+        restart_app()
+    end).
+
+
+bad_messages_restart_activity_monitor(#{}) ->
+    ?_test(begin
+        % bad cast kills the activity monitor
+        [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+        Ref1 = monitor(process, AMon1),
+        gen_server:cast(AMon1, bad_cast),
+        receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad calls restart activity monitor
+        [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+        Ref2 = monitor(process, AMon2),
+        catch gen_server:call(AMon2, bad_call),
+        receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+        restart_app(),
+
+        % bad info message kills activity monitor
+        [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+        Ref3 = monitor(process, AMon3),
+        AMon3 ! a_bad_message,
+        receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+
+        restart_app()
+    end).
+
+
+worker_accept_and_finish(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+
+        AcceptResponse = couch_jobs:accept(T),
+        ?assertMatch({ok, J, <<_/binary>>}, AcceptResponse),
+        {ok, J, WLock} = AcceptResponse,
+
+        ?assertEqual({ok, #{}, running}, couch_jobs:get_job(T, J)),
+        Res = #{<<"result">> => <<"done">>},
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{?DATA => Res}, WLock)
+        end)),
+        ?assertEqual({ok, #{?DATA => Res}, finished},
+            couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, couch_jobs:remove(T, J))
+    end).
+
+
+worker_update(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+
+        AcceptResponse = couch_jobs:accept(T),
+        ?assertMatch({ok, J, <<_/binary>>}, AcceptResponse),
+        {ok, J, WLock} = AcceptResponse,
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+        end)),
+        ?assertEqual({ok, #{?DATA => 1}, running}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 2}, WLock)
+        end)),
+        ?assertEqual({ok, #{?DATA => 2}, running}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{?DATA => 3}, WLock)
+        end)),
+        ?assertEqual({ok, #{?DATA => 3}, finished}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 4}, WLock)
+        end)),
+
+        ?assertMatch(not_found, couch_jobs:accept(T)),
+
+        ?assertEqual(ok, couch_jobs:remove(T, J))
+    end).
+
+
+resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+
+        {ok, J, WLock1} = couch_jobs:accept(T),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:resubmit(Tx, T, J, undefined, WLock1)
+        end)),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{?DATA => 1}, WLock1)
+        end)),
+
+        ?assertEqual({ok, #{?DATA => 1}, pending}, couch_jobs:get_job(T, J)),
+
+        {ok, J, WLock2} =  couch_jobs:accept(T),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{?DATA => 2}, WLock2)
+        end)),
+        ?assertEqual({ok, #{?DATA => 2}, finished}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, couch_jobs:remove(T, J))
+    end).
+
+
+add_custom_priority(#{t1 := T, j1 := J1, j2 := J2}) ->
+    ?_test(begin
+        ?assertEqual(ok, couch_jobs:add(T, J1, #{?PRIORITY => 5})),
+        ?assertEqual(ok, couch_jobs:add(T, J2, #{?PRIORITY => 3})),
+
+        ?assertMatch({ok, J2, _}, couch_jobs:accept(T)),
+        ?assertMatch({ok, J1, _}, couch_jobs:accept(T)),
+        ?assertMatch(not_found, couch_jobs:accept(T))
+    end).
+
+
+resubmit_custom_priority(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ?assertEqual(ok, couch_jobs:add(T, J, #{?PRIORITY => 7})),
+        {ok, J, WLock} = couch_jobs:accept(T),
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:resubmit(Tx, T, J, 9, WLock)
+        end)),
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{?DATA => 1}, WLock)
+        end)),
+        ?assertEqual({ok, #{?DATA => 1, ?PRIORITY => 9}, pending},
+            couch_jobs:get_job(T, J))
+    end).
+
+
+accept_max_priority(#{t1 := T, j1 := J1, j2 := J2}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J1, #{?PRIORITY => <<"5">>}),
+        ok = couch_jobs:add(T, J2, #{?PRIORITY => <<"3">>}),
+        ?assertEqual(not_found, couch_jobs:accept(T, <<"2">>)),
+        ?assertMatch({ok, J2, _}, couch_jobs:accept(T, <<"3">>)),
+        ?assertMatch({ok, J1, _}, couch_jobs:accept(T, <<"9">>))
+    end).
+
+
+subscribe(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+
+        SubRes0 =  couch_jobs:subscribe(T, J),
+        ?assertMatch({ok, {_, _}, pending}, SubRes0),
+        {ok, SubId0, pending} = SubRes0,
+
+        ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)),
+
+        SubRes =  couch_jobs:subscribe(T, J),
+        ?assertMatch({ok, {_, _}, pending}, SubRes),
+        {ok, SubId, pending} = SubRes,
+
+        {ok, J, WLock} = couch_jobs:accept(T),
+        ?assertMatch(timeout, couch_jobs:wait_job_state(SubId, finished, 50)),
+        ?assertMatch({T, J, running}, couch_jobs:wait_job_state(SubId, 5000)),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+        end)),
+
+        % Make sure we get intermediate `running` updates
+        ?assertMatch({T, J, running}, couch_jobs:wait_job_state(SubId, 5000)),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{}, WLock)
+        end)),
+        ?assertMatch({T, J, finished}, couch_jobs:wait_job_state(SubId,
+            5000)),
+
+        ?assertEqual(timeout, couch_jobs:wait_job_state(SubId, 50)),
+        ?assertEqual(finished, couch_jobs:subscribe(T, J)),
+
+        ?assertEqual(ok, couch_jobs:remove(T, J))
+    end).
+
+
+subscribe_callback(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+
+        TestPid = self(),
+        SomeRef = make_ref(),
+        Cbk = fun(Type, JobId, JobState) ->
+            TestPid ! {SomeRef, Type, JobId, JobState}
+        end,
+
+        SubRes = couch_jobs:subscribe(T, J, Cbk),
+        ?assertMatch({ok, {_, _}, pending}, SubRes),
+        {ok, _, pending} = SubRes,
+
+        {ok, J, WLock} = couch_jobs:accept(T),
+        receive {SomeRef, T, J, running} -> ok end,
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{}, WLock)
+        end)),
+        receive {SomeRef, T, J, finished} -> ok end
+    end).
+
+
+subscribe_errors(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+
+        Cbk = fun(_Type, _JobId, _JobState) -> ok end,
+
+        ?assertMatch({error, _}, couch_jobs:subscribe(<<"badtype">>, J)),
+        ?assertMatch({error, _}, couch_jobs:subscribe(<<"badtype">>, J, Cbk)),
+
+        ?assertEqual(not_found, couch_jobs:subscribe(T, <<"j5">>)),
+        ?assertEqual(not_found, couch_jobs:subscribe(T, <<"j5">>, Cbk)),
+
+        {ok, J, WLock} = couch_jobs:accept(T),
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{}, WLock)
+        end)),
+
+        ?assertEqual(finished, couch_jobs:subscribe(T, J)),
+        ?assertEqual(finished, couch_jobs:subscribe(T, J, Cbk))
+    end).
+
+
+enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) ->
+    {timeout, 15, ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+
+        {ok, J, WLock} = couch_jobs:accept(T),
+
+        {ok, SubId, running} = couch_jobs:subscribe(T, J),
+        ?assertEqual({T, J, pending}, couch_jobs:wait_job_state(SubId,
+            pending, 3 * Timeout * 1000)),
+
+        ?assertMatch({ok, #{}, pending}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+        end)),
+
+
+        ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{}, WLock)
+        end)),
+
+
+        ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:resubmit(Tx, T, J, undefined, WLock)
+        end))
+    end)}.
+
+
+cancel_running_job(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+        {ok, J, WLock} = couch_jobs:accept(T),
+        ?assertEqual(canceled, couch_jobs:remove(T, J)),
+        % Try again and it should be a no-op
+        ?assertEqual(canceled, couch_jobs:remove(T, J)),
+
+        ?assertEqual(canceled, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+        end)),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{?DATA => 2}, WLock)
+        end)),
+
+        ?assertMatch({ok, #{?DATA := 2}, finished}, couch_jobs:get_job(T, J)),
+
+        ?assertEqual(ok, couch_jobs:remove(T, J))
+    end).
+
+
+stop_and_remove_running_job(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+        {ok, J, WLock} = couch_jobs:accept(T),
+        {_, Ref} = spawn_monitor(fun() ->
+            exit({result, couch_jobs:stop_and_remove(T, J, 10000)})
+        end),
+
+        fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+        end),
+
+        fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:update(Tx, T, J, #{?DATA => 2}, WLock)
+        end),
+
+        ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+            couch_jobs:finish(Tx, T, J, #{?DATA => 3}, WLock)
+        end)),
+
+        Exit = receive {'DOWN', Ref, _, _, Reason} -> Reason end,
+        ?assertEqual({result, ok}, Exit),
+
+        ?assertMatch(not_found, couch_jobs:get_job(T, J))
+    end).
+
+
+clear_type_works(#{t1 := T, j1 := J}) ->
+    ?_test(begin
+        ok = couch_jobs:add(T, J, #{}),
+        ?assertEqual([{J, pending, #{}}], couch_jobs:get_jobs(T)),
+        couch_jobs_fdb:clear_type(T),
+        ?assertEqual([], couch_jobs:get_jobs(T))
+    end).