You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/05/28 21:49:47 UTC

[couchdb] branch prototype/rfc-couch-jobs updated (db4d1dc -> b00a64d)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch prototype/rfc-couch-jobs
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard db4d1dc  CouchDB background jobs WIP3
     new b00a64d  CouchDB background jobs WIP3

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (db4d1dc)
            \
             N -- N -- N   refs/heads/prototype/rfc-couch-jobs (b00a64d)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch_jobs/src/couch_jobs.erl                  | 137 +++++++++++++++------
 src/couch_jobs/src/couch_jobs.hrl                  |  10 ++
 src/couch_jobs/src/couch_jobs_activity_monitor.erl |   2 +
 src/couch_jobs/src/couch_jobs_fdb.erl              |  49 ++++++--
 src/couch_jobs/src/couch_jobs_notifier.erl         |   6 +-
 src/couch_jobs/src/couch_jobs_server.erl           |   2 +-
 6 files changed, 155 insertions(+), 51 deletions(-)


[couchdb] 01/01: CouchDB background jobs WIP3

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/rfc-couch-jobs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b00a64d5fccd298e398c3a3a90a2dad2d59c528f
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu May 16 19:11:50 2019 -0400

    CouchDB background jobs WIP3
    
    So far got:
     - Main API module (couch_jobs.er)
     - Supervisor and app structures
     - FDB read/write code in couch_jobs_fdb.erl
     - All jobs creation API: add(), remove(), submit(), get_job()
     - Workers API: finish(), resubmit(), update() and accept()
     - Directory path caching
     - Metadata changes check
     - Activity monitor
     - Type monitor (start notifiers and activity monitors when new types detected)
     - Job state subscription: subscribe(), unsubscribe()
    
    Still need:
     - An example worker
     - Tests
---
 rebar.config.script                                |   1 +
 rel/reltool.config                                 |   2 +
 src/couch_jobs/.gitignore                          |   4 +
 src/couch_jobs/README.md                           |   5 +
 src/couch_jobs/src/couch_jobs.app.src              |  28 ++
 src/couch_jobs/src/couch_jobs.erl                  | 250 +++++++++++
 src/couch_jobs/src/couch_jobs.hrl                  |  50 +++
 src/couch_jobs/src/couch_jobs_activity_monitor.erl | 133 ++++++
 .../src/couch_jobs_activity_monitor_sup.erl        |  57 +++
 src/couch_jobs/src/couch_jobs_app.erl              |  26 ++
 src/couch_jobs/src/couch_jobs_fdb.erl              | 496 +++++++++++++++++++++
 src/couch_jobs/src/couch_jobs_notifier.erl         | 210 +++++++++
 src/couch_jobs/src/couch_jobs_notifier_sup.erl     |  57 +++
 src/couch_jobs/src/couch_jobs_pending.erl          | 150 +++++++
 src/couch_jobs/src/couch_jobs_server.erl           | 153 +++++++
 src/couch_jobs/src/couch_jobs_sup.erl              |  66 +++
 src/couch_jobs/src/couch_jobs_type_monitor.erl     |  81 ++++
 17 files changed, 1769 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index 3b58bcb..2def724 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -76,6 +76,7 @@ SubDirs = [
     "src/couch_tests",
     "src/ddoc_cache",
     "src/fabric",
+    "src/couch_jobs",
     "src/global_changes",
     "src/mango",
     "src/rexi",
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e..afebc44 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -34,6 +34,7 @@
         couch,
         couch_epi,
         couch_index,
+        couch_jobs,
         couch_log,
         couch_mrview,
         couch_plugins,
@@ -90,6 +91,7 @@
     {app, config, [{incl_cond, include}]},
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
+    {app, couch_jobs, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
     {app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_jobs/.gitignore b/src/couch_jobs/.gitignore
new file mode 100644
index 0000000..6ef4c52
--- /dev/null
+++ b/src/couch_jobs/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_jobs.app
+.DS_Store
\ No newline at end of file
diff --git a/src/couch_jobs/README.md b/src/couch_jobs/README.md
new file mode 100644
index 0000000..b2910a5
--- /dev/null
+++ b/src/couch_jobs/README.md
@@ -0,0 +1,5 @@
+CouchDB Jobs Application
+=========================
+
+Run background jobs in CouchDB
+
diff --git a/src/couch_jobs/src/couch_jobs.app.src b/src/couch_jobs/src/couch_jobs.app.src
new file mode 100644
index 0000000..c9b8b2c
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.app.src
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_jobs, [
+    {description, "CouchDB Jobs"},
+    {vsn, git},
+    {mod, {couch_jobs_app, []}},
+    {registered, [
+        couch_jobs_sup
+    ]},
+    {applications, [
+        kernel,
+        stdlib,
+        erlfdb,
+        couch_log,
+        config,
+        fabric
+    ]}
+]}.
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
new file mode 100644
index 0000000..76c87f1
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -0,0 +1,250 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs).
+
+-export([
+    add/3,
+    remove/2,
+    stop_and_remove/3,
+    resubmit/2,
+    resubmit/3,
+    get_job/2,
+
+    accept/1,
+    accept/2,
+    finish/5,
+    resubmit/5,
+    update/5,
+
+    subscribe/2,
+    subscribe/3,
+    unsubscribe/1,
+    wait_job_state/2,
+    wait_job_state/3
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+%% Job Creation API
+
+-spec add(job_type(), job_id(), job_opts()) -> ok | {error, any()}.
+add(Type, JobId, JobOpts) ->
+    try
+        ok = validate_job_opts(JobOpts)
+    catch
+        Tag:Err -> {error, {invalid_job_args, Tag, Err}}
+    end,
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_job_fdb:add(JTx, Type, JobId, JobOpts)
+    end).
+
+
+-spec remove(job_type(), job_id()) -> ok | not_found | canceled.
+remove(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+         couch_jobs_fdb:remove(JTx, Type, JobId)
+    end).
+
+
+-spec stop_and_remove(job_type(), job_id(), timeout()) ->
+    ok | not_found | timeout.
+stop_and_remove(Type, JobId, Timeout) ->
+    case remove(Type, JobId) of
+        not_found -> not_found;
+        ok -> ok;
+        canceled ->
+            case subscribe(Type, JobId) of
+                not_found -> not_found;
+                finished -> ok;
+                {ok, SubId, _JobState} ->
+                    case wait_job_state(SubId, finished, Timeout) of
+                        timeout -> timeout;
+                        {Type, JobId, finished} -> ok
+                    end
+            end
+    end.
+
+
+-spec resubmit(job_type(), job_id()) -> ok | not_found.
+resubmit(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId)
+    end).
+
+
+-spec resubmit(job_type(), job_id(), job_priority()) -> ok | not_found.
+resubmit(Type, JobId, NewPriority) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority)
+    end).
+
+
+-spec get_job(job_type(), job_id()) -> {ok, job_opts(), job_state()} | not_found.
+get_job(Type, JobId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_job(JTx, Type, JobId)
+    end).
+
+
+%% Worker Implementation API
+
+-spec accept(job_type()) -> {ok, job_id(), worker_lock()} | not_found.
+accept(Type) ->
+    accept(Type, undefined).
+
+
+-spec accept(job_type(), job_priority()) -> {ok, job_id(), worker_lock()}
+    | not_found.
+accept(Type, MaxPriority) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:accept(JTx, Type, MaxPriority)
+    end).
+
+
+-spec finish(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok |
+     worker_conflict | no_return().
+finish(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:finish(JTx, Type, JobId, JobOpts, WorkerLockId)
+    end).
+
+
+-spec resubmit(jtx(), job_type(), job_id(), job_priority() | undefined,
+    worker_lock()) -> ok | worker_conflict | canceled | no_return().
+resubmit(Tx, Type, JobId, NewPriority, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority, WorkerLockId)
+    end).
+
+
+-spec update(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok |
+    worker_conflict | canceled | no_return().
+update(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+        couch_jobs_fdb:update(JTx, Type, JobId, JobOpts, WorkerLockId)
+    end).
+
+
+
+%% Subscription API
+
+% Receive events as messages. Wait for them using `wait_job_state/2,3`
+% functions.
+%
+-spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state()}
+    | not_found | {error, any()}.
+subscribe(Type, JobId) ->
+    case couch_jobs_server:get_notifier_server(Type) of
+        {ok, Server} ->
+            case couch_jobs_notifier:subscribe(Server, JobId, self()) of
+                {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+                not_found -> not_found;
+                finished -> finished
+            end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+% Receive events as callbacks. Callback arguments will be:
+%    Fun(SubscriptionRef, Type, JobId, JobState)
+%
+% Returns:
+%   - {ok, SubscriptionRef, CurrentState} where:
+%      - SubscriptionRef is opaque reference for the subscription
+%      - CurrentState is the current job state
+%   - `not_found` if job was not found
+%   - `finished` if job is already in the `finished` state
+%
+-spec subscribe(job_type(), job_id(), job_callback()) -> {ok,
+    job_subscription(), job_state()} | not_found | {error, any()}.
+subscribe(Type, JobId, Fun) when is_function(Fun, 4) ->
+    case couch_jobs_server:get_notifier_server(Type) of
+        {ok, Server} ->
+             case couch_jobs_notifier:subscribe(Server, JobId, Fun, self()) of
+                 {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+                 not_found -> not_found;
+                 finished -> finished
+             end;
+        {error, Error} ->
+            {error, Error}
+    end.
+
+
+% Unsubscribe from getting notifications based on a particular subscription.
+% Each subscription should be followed by its own unsubscription call. However,
+% subscriber processes are also monitored and auto-unsubscribed if they exit.
+% So the subscribing process is exiting, calling this function is optional.
+%
+-spec unsubscribe(job_subscription()) -> ok.
+unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) ->
+    try
+        couch_jobs_notifier:unsubscribe(Server, Ref)
+    after
+        flush_notifications(Ref)
+    end.
+
+
+% Wait to receive job state updates as messages.
+%
+-spec wait_job_state(job_subscription(), timeout()) -> {job_type(), job_id(),
+    job_state()} | timeout.
+wait_job_state({_, Ref}, Timeout) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+            {Type, JobId, JobState}
+    after
+        Timeout -> timeout
+    end.
+
+
+% Wait for a particular job state received as a message.
+%
+-spec wait_job_state(job_subscription(), job_state(), timeout()) ->
+    {job_type(), job_id(), job_state()} | timeout.
+wait_job_state({_, Ref}, JobState, Timeout) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+            {Type, JobId, JobState}
+    after
+        Timeout -> timeout
+    end.
+
+
+%% Private utilities
+
+validate_job_opts(#{} = JobOpts) ->
+    jiffy:encode(JobOpts),
+    case maps:get(?OPT_RESUBMIT, JobOpts, undefined) of
+        undefined -> ok;
+        true -> ok;
+        Resubmit -> error({invalid_resubmit, Resubmit, JobOpts})
+    end,
+    case maps:get(?OPT_PRIORITY, JobOpts, undefined) of
+        undefined -> ok;
+        Tuple when is_tuple(Tuple) -> ok;
+        Binary when is_binary(Binary) -> ok;
+        Int when is_integer(Int), Int >= 0 -> ok;
+        Priority -> error({invalid_priority, Priority, JobOpts})
+    end.
+
+
+flush_notifications(Ref) ->
+    receive
+        {?COUCH_JOBS_EVENT, Ref, _, _, _} ->
+            flush_notifications(Ref)
+    after
+        0 -> ok
+    end.
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
new file mode 100644
index 0000000..60cbb31
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -0,0 +1,50 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% JobOpts field definitions
+%
+-define(OPT_PRIORITY, <<"priority">>).
+-define(OPT_DATA, <<"data">>).
+-define(OPT_CANCEL, <<"cancel">>).
+-define(OPT_RESUBMIT, <<"resubmit">>).
+
+% These might be in a fabric public .hrl eventually
+%
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+% Data model definitions
+% Switch these to numbers eventually.
+%
+-define(JOBS, <<"couch_jobs">>).
+-define(DATA, <<"data">>).
+-define(PENDING, <<"pending">>).
+-define(WATCHES, <<"watches">>).
+-define(ACTIVITY_TIMEOUT, <<"activity_timeout">>).
+-define(ACTIVITY, <<"activity">>).
+
+
+% Couch jobs event notifier tag
+-define(COUCH_JOBS_EVENT, '$couch_jobs_event').
+
+-type jtx() :: map().
+-type job_id() :: binary().
+-type job_type() :: tuple() | binary().
+-type job_opts() :: map().
+-type job_state() :: running | pending | finished.
+-type job_priority() :: tuple() | binary() | non_neg_integer().
+-type job_subscription() :: {pid(), reference()}.
+-type job_callback() :: fun((job_subscription(), job_type(), job_id(), job_state()) -> ok).
+-type worker_lock() :: binary().
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
new file mode 100644
index 0000000..009bb1e
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -0,0 +1,133 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-record(st, {
+    jtx,
+    type,
+    tref,
+    timeout = 0,
+    vs = null
+}).
+
+
+-define(MAX_JITTER_DEFAULT, 15000).
+
+
+start_link(Type) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []).
+
+
+%% gen_server callbacks
+
+init([Type]) ->
+    St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+    {ok, schedule_check(St)}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_activity, St) ->
+    St1 = check_activity(St),
+    St2 = schedule_check(St1),
+    {noreply, St2};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+% Private helper functions
+
+check_activity(#st{vs = null} = St) ->
+    St#st{vs = get_vs(St)};
+
+
+check_activity(#st{} = St) ->
+    NewVS = get_vs(St),
+    St1 = case get_inactive_since(St) of
+        [] -> St;
+        [_ | _] -> re_enqueue_inactive(St)
+    end,
+    St1#st{vs = NewVS}.
+
+
+get_timeout_msec(JTx, Type) ->
+    TimeoutSec = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_timeout(JTx1, Type)
+    end),
+    timer:seconds(TimeoutSec).
+
+
+schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
+    % Reset versionstamp if timeout changed.
+    St1 = case get_timeout_msec(JTx, Type) of
+        OldTimeout -> St;
+        NewTimeout -> St#st{vs = null, timeout = NewTimeout}
+    end,
+    #st{timeout = Timeout} = St1,
+    MaxJitter = max(Timeout, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+    St1#st{tref = erlang:set_after(Wait, self(), check_activity)}.
+
+
+get_vs(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
+re_enqueue_inactive(#st{jtx = JTx, type = Type, vs = VS}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, VS)
+    end).
+
+
+get_inactive_since(#st{jtx = JTx, type = Type, vs = VS}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_job_fdb:get_inactive_since(JTx1, Type, VS)
+    end).
+
+
+get_max_jitter_msec()->
+    config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
new file mode 100644
index 0000000..943926e
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
@@ -0,0 +1,57 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_monitor/1,
+    stop_monitor/1
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_monitor(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_monitor(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_monitor,
+            restart => temporary,
+            start => {couch_jobs_activity_monitor, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_app.erl b/src/couch_jobs/src/couch_jobs_app.erl
new file mode 100644
index 0000000..720b948
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_app.erl
@@ -0,0 +1,26 @@
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_Type, []) ->
+    couch_jobs_sup:start_link().
+
+
+stop([]) ->
+    ok.
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
new file mode 100644
index 0000000..d38ea73
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -0,0 +1,496 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_fdb).
+
+
+-export([
+    add/4,
+    remove/3,
+    resubmit/4,
+    get_job/3,
+
+    accept/2,
+    accept/3,
+    finish/5,
+    update/5,
+
+    set_activity_timeout/3,
+    get_activity_timeout/2,
+
+    get_types/1,
+
+    get_activity_vs/2,
+    get_activity_vs_and_watch/2,
+    get_active_since/3,
+    re_enqueue_inactive/3,
+
+    clear_type/2,
+
+    init_cache/0,
+
+    get_jtx/0,
+    get_jtx/1,
+
+    tx/2
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Data model
+%
+% (?JOBS, ?DATA, Type, JobId) = (Sequence, WorkerLockId, Priority, JobOpts)
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+% (?JOBS, ?WATCHES, Type) = Sequence
+% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
+% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
+
+% Job creation API
+
+add(#{jtx := true} = JTx0, Type, JobId, JobOpts) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        <<_/binary>> ->
+            {error, duplicate_job};
+        not_found ->
+            maybe_enqueue(JTx, Type, JobId, JobOpts)
+    end.
+
+
+remove(#{jtx := true} = JTx0, Type, JobId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, WorkerLockId, _, _} = Job when WorkerLockId =/= null ->
+            ok = cancel(JTx, Key, Job),
+            canceled;
+        {_, _, null, _,  _} ->
+            erlfdb:clear(Tx, Key),
+            ok;
+        {_, _, Priority, _, _} ->
+            couch_jobs_pending:remove(JTx, Type, Priority, JobId),
+            erlfdb:clear(Tx, Key),
+            ok;
+        not_found ->
+            not_found
+    end.
+
+
+resubmit(#{jtx := true} = JTx, Type, JobId, NewPriority) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, _, _, #{?OPT_RESUBMIT := true}} ->
+            % Already re-submitted
+            ok;
+        {Seq, WorkerLockId, Priority, #{} = JobOpts} ->
+            JobOpts1 = JobOpts#{?OPT_RESUBMIT => true},
+            JobOpts2 = update_priority(JobOpts1, NewPriority),
+            JobOptsEnc = jiffy:encode(JobOpts2),
+            Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val),
+            ok;
+        not_found ->
+            not_found
+    end.
+
+
+get_job(#{jtx := true} = JTx, Type, JobId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {_, WorkerLockId, Priority, JobOpts} ->
+            {ok, JobOpts, job_state(WorkerLockId, Priority)};
+        not_found ->
+            not_found
+    end.
+
+
+% Worker public API
+
+accept(#{jtx := true} = JTx, Type) ->
+    accept(JTx, Type, undefined).
+
+
+accept(#{jtx := true} = JTx0, Type, MaxPriority) ->
+    #{jtx := true} = JTx = get_jtx(JTx0),
+    case couch_jobs_pending:dequeue(JTx, Type, MaxPriority) of
+        not_found ->
+            not_found;
+        <<_/binary>> = JobId ->
+            WorkerLockId = fabric2_util:uuid(),
+            update_lock(JTx, Type, JobId, WorkerLockId),
+            update_activity(JTx, Type, JobId),
+            {ok, JobId, WorkerLockId}
+    end.
+
+
+finish(#{jtx := true} = JTx0, Type, JobId, JobOpts, WorkerLockId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {Status, {Seq, _, _, JobOptsCur}} when
+                Status =:= ok orelse Status =:= canceled ->
+            % If the job was canceled, allow updating its data one last time
+            clear_activity(JTx, Type, Seq),
+            MergedOpts = maps:merge(JobOptsCur, JobOpts),
+            maybe_enqueue(JTx, Type, JobId, MergedOpts),
+            update_watch(JTx, Type),
+            ok;
+        {worker_conflict, _} ->
+            worker_conflict
+    end.
+
+
+resumbit(#{jtx := true} = JTx0, Type, JobId, NewPriority, WorkerLockId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {ok, {_, _, _, #{?OPT_RESUBMIT := true}}} ->
+            update_activity(JTx, Type, JobId),
+            ok; % Already re-submitted
+        {ok, {null, WorkerLockId, null, JobOpts}} ->
+            update_activity(JTx, Type, JobId),
+            JobOpts1 = JobOpts#{?OPT_RESUBMIT => true},
+            JobOpts2 = update_priority(JobOpts1, NewPriority),
+            JobOptsEnc = jiffy:encode(JobOpts2),
+            Val = erlfdb_tuple:pack({null, WorkerLockId, null, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val),
+            ok;
+        {ok, InvalidState} ->
+            error({couch_job_invalid_updata_state, InvalidState});
+        {Status, _} when Status =/= ok ->
+            Status
+    end.
+
+
+update(#{jtx := true} = JTx, Type, JobId, JobOpts, WorkerLockId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job_and_status(Tx, Key, WorkerLockId) of
+        {ok, {null, WorkerLockId, null, JobOptsCur}} ->
+            update_activity(JTx, Type, JobId),
+            JobOpts1 = maps:merge(JobOptsCur, JobOpts),
+            JobOptsEnc = jiffy:encode(JobOpts1),
+            Val = erlfdb_tuple:pack({null, WorkerLockId, null, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val),
+            ok;
+        {ok, InvalidState} ->
+            error({couch_job_invalid_updata_state, InvalidState});
+        {Status, _} when Status =/= ok ->
+            Status
+    end.
+
+
+% Type and activity monitoring API
+
+set_activity_timeout(#{jtx := true} = JTx, Type, Timeout) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    Val = erlfdb_tuple:pack(Timeout),
+    erlfdb:set(Tx, Key, Val).
+
+
+get_activity_timeout(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+    Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+    erlfdb_tuple:unpack(Val).
+
+
+get_types(#{jtx := true} = JTx) ->
+    #{tx := Tx, type := Type, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range_startswith(Tx, Prefix, Opts),
+    lists:map(fun(K, _V) ->
+        Type = erfdb_tuple:unpack(K, Prefix)
+    end, erlfdb:wait(Future)).
+
+
+get_activity_vs(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    erlfdb:wait(erlfdb:get(Tx, Key)).
+
+
+get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    {Val, WatchFuture} = erlfdb:get_and_watch(Tx, Key),
+    {Val, WatchFuture}.
+
+
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tupe:pack({?ACTIVITY}, Jobs),
+    StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    StartKeySel = erlfdb_key:first_greater_than(StartKey),
+    {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
+    lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+    {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
+    EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+    EndKeySel = erlfdb_key:last_less_or_equal(EndKey),
+    Opts = [{streaming_mode, want_all}],
+    Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
+    lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+re_enqueue_inactive(#{jtx := true} = JTx, Type, Versionstamp) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    JobIds = get_inactive_since(JTx, Type, Versionstamp),
+    lists:foreach(fun(JobId) ->
+        Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+        {Seq, _, _, JobOpts} = get_job(Tx, Key),
+        clear_activity(JTx, Type, Seq),
+        maybe_enqueue(JTx, Type, JobId, JobOpts)
+    end, JobIds),
+    case length(JobIds) > 0 of
+        true -> update_watch(JTx, Type);
+        false -> ok
+    end,
+    JobIds.
+
+
+% Debug and testing API
+
+clear_type(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+    lists:foreach(fun(Section) ->
+        Prefix = erlfdb_tuple:pack({Section, Type}, Jobs),
+        erlfdb:clear_range_startswith(Tx, Prefix)
+    end, [?DATA, ?PENDING, ?WATCHES, ?ACTIVITY_TIMEOUT, ?ACTIVITY]).
+
+
+% Cache initialization API. Called from the supervisor just to create the ETS
+% table. It returns `ignore` to tell supervisor it won't actually start any
+% process, which is what we want here.
+%
+init_cache() ->
+    ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
+    ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
+    ignore.
+
+
+% Cached job transaction object. This object wraps a transaction, caches the
+% directory lookup path, and the metadata version. The function can be used from
+% or outside the transaction. When used from a transaction it will verify if
+% the metadata was changed, and will refresh automatically.
+%
+get_jtx() ->
+    get_jtx(undefined).
+
+
+get_jtx(#{tx := Tx} = _TxDb) ->
+    get_jtx(Tx);
+
+get_jtx(undefined = _Tx) ->
+    case ets:lookup(?MODULE, ?JOBS) of
+        [{_, #{} = JTx}] -> JTx;
+        [] -> update_jtx_cache(init_jtx(undefined))
+    end;
+
+get_jtx({erlfdb_transaction, _} = Tx) ->
+    case ets:lookup(?MODULE, ?JOBS) of
+        [{_, #{} = JTx}] -> ensure_current(JTx#{tx := Tx});
+        [] -> update_jtx_cache(init_jtx(Tx))
+    end.
+
+
+% Transaction processing to be used with couch jobs' specific transaction
+% contexts
+%
+tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
+    fabric2_fdb:transactional(JTx, Fun).
+
+
+% Utility fdb functions used by other module in couch_job. Maybe move these to a
+% separate module if the list keep growing
+
+has_versionstamp(?UNSET_VS) ->
+    true;
+
+has_versionstamp(Tuple) when is_tuple(Tuple) ->
+    has_versionstamp(tuple_to_list(Tuple));
+
+has_versionstamp([Elem | Rest]) ->
+    has_versionstamp(Elem) orelse has_versionstamp(Rest);
+
+has_versionstamp(_Other) ->
+    false.
+
+
+% Private helper functions
+
+update_priority(JobOpts, undefined) ->
+    JobOpts;
+
+update_priority(JobOpts, NewPriority) ->
+    OldPriority = maps:get(?OPT_PRIORITY, JobOpts, undefined),
+    case NewPriority =/= OldPriority of
+        true -> JobOpts#{?OPT_PRIORITY => NewPriority};
+        false -> JobOpts
+    end.
+
+
+cancel(#{jx := true},  Key, {_, _, _, #{?OPT_CANCEL := true}}) ->
+    ok;
+
+cancel(#{jtx := true, tx := Tx}, Key, Job) ->
+    {Seq, WorkerLockId, Priority, JobOpts} = Job,
+    JobOpts1 = JobOpts#{?OPT_CANCEL => true},
+    JobOptsEnc = jiffy:encode(JobOpts1),
+    Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+    erlfdb:set(Tx, Key, Val),
+    ok.
+
+
+maybe_enqueue(#{jtx := true} = JTx, Type, JobId, JobOpts) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    Resubmit = maps:get(?OPT_RESUBMIT, JobOpts, false) == true,
+    Cancel = maps:get(?OPT_CANCEL, JobOpts, false) == true,
+    JobOpts1 = maps:without([?OPT_RESUBMIT], JobOpts),
+    Priority = maps:get(?OPT_PRIORITY, JobOpts1, ?UNSET_VS),
+    JobOptsEnc = jiffy:encode(JobOpts1),
+    case Resubmit andalso not Cancel of
+        true ->
+            Val = erlfdb_tuple:pack_vs({null, null, Priority, JobOptsEnc}),
+            case couch_jobs_util:has_versionstamp(Priority) of
+                true -> erlfdb:set_versionstamped_value(Tx, Key, Val);
+                false -> erlfdb:set(Tx, Key, Val)
+            end,
+            couch_jobs_pending:enqueue(JTx, Type, Priority, JobId);
+        false ->
+            Val = erlfdb_tuple:pack({null, null, null, JobOptsEnc}),
+            erlfdb:set(Tx, Key, Val)
+    end,
+    ok.
+
+
+get_job(Tx = {erlfdb_transaction, _}, Key) ->
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        <<_/binary>> = Val ->
+            {Seq, WorkerLockId, Priority, JobOptsEnc} = erlfdb_tuple:unpack(Val),
+            JobOpts = jiffy:decode(JobOptsEnc, [return_maps]),
+            {Seq, WorkerLockId, Priority, JobOpts};
+        not_found ->
+            not_found
+    end.
+
+
+get_job_and_status(Tx, Key, WorkerLockId) ->
+    case get_job(Tx, Key) of
+        {_, LockId, _, _} = Res when WorkerLockId =/= LockId ->
+            {worker_conflict, Res};
+        {_, _, _, #{?OPT_CANCEL := true}} = Res ->
+            {canceled, Res};
+        {_, _, _, #{}} = Res ->
+            {ok, Res};
+        not_found ->
+            {worker_conflict, not_found}
+    end.
+
+
+update_activity(#{jtx := true} = JTx, Type, JobId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
+    erlfdb:set_versionstamped_key(Tx, Key, JobId),
+    update_watch(JTx, Type).
+
+
+clear_activity(#{jtx := true} = JTx, Type, Seq) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+update_watch(#{jtx := true} = JTx, Type) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+    erlfdb:set_versionstamped_value(Tx, Key, ?UNSET_VS).
+
+
+update_lock(#{jtx := true} = JTx, Type, JobId, WorkerLockId) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+    case get_job(Tx, Key) of
+        {null, null, _, JobOpts} ->
+            ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)},
+            Val = erlfdb_tuple:pack_vs(ValTup),
+            erlfdb:set_versionstamped_value(Tx, Key, Val);
+        InvalidState ->
+            error({couch_job_invalid_accept_state, InvalidState})
+    end.
+
+
+job_state(WorkerLockId, Priority) ->
+    case {WorkerLockId, Priority} of
+        {null, null} ->
+            finished;
+        {WorkerLockId, _} when WorkerLockId =/= null ->
+            running;
+        {_, Priority} when Priority =/= null ->
+            pending;
+        ErrorState ->
+            error({invalid_job_state, ErrorState})
+    end.
+
+
+
+
+
+% This a transaction context object similar to the Db = #{} one from fabric2_fdb.
+% It's is used to cache the jobs path directory (to avoid extra lookups on every
+% operation) and to check for metadata changes (in case directory changes).
+%
+init_jtx(undefined) ->
+    fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
+
+init_jtx({erlfdb_transaction, _} = Tx) ->
+    Root = erlfdb_directory:root(),
+    CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+    LayerPrefix = erlfdb_directory:get_name(CouchDB),
+    JobsPrefix = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
+    Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+    % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
+    % but we also assert that this is a job transaction using the jtx => true field
+    #{
+        jtx => true,
+        tx => Tx,
+        layer_prefix => LayerPrefix,
+        jobs_prefix => JobsPrefix,
+        md_version => Version
+    }.
+
+
+ensure_current(#{jtx := true, tx := Tx, md_version := Version} = JTx) ->
+    case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+        Version -> JTx;
+        _NewVersion -> update_jtx_cache(init_jtx(Tx))
+    end.
+
+
+update_jtx_cache(#{jtx := true} = JTx) ->
+    CachedJTx = JTx#{tx := undefined},
+    ets:insert(?MODULE, {?JOBS, CachedJTx}),
+    JTx.
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
new file mode 100644
index 0000000..598b2f4
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -0,0 +1,210 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/1,
+    subscribe/3,
+    subscribe/4,
+    unsubscribe/2
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, 1000).
+-define(TYPE_MONITOR_TIMEOUT_DEFAULT, 60000).
+
+
+-record(st, {
+    jtx,
+    type,
+    monitor_pid,
+    subs
+}).
+
+
+start_link(Type) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []).
+
+
+subscribe(Server, JobId, Pid) when is_pid(Pid) ->
+    gen_server:call(Server, {subscribe, JobId, nil, Pid}, infinity).
+
+
+subscribe(Server, JobId, Fun, Pid) when is_function(Fun, 4), is_pid(Pid) ->
+    gen_server:call(Server, {subscribe, JobId, Fun, Pid}, infinity).
+
+
+unsubscribe(Server, Ref) when is_reference(Ref) ->
+    gen_server:call(Server, {unsubscribe, Ref}, infinity).
+
+
+init([Type]) ->
+    EtsOpts = [ordered_set, protected],
+    JTx = couch_jobs_fdb:get_jtx(),
+    St = #st{jtx = JTx, type = Type, subs = ets:new(?MODULE, EtsOpts)},
+    VS = get_type_vs(St),
+    Pid = couch_jobs_type_monitor:start(Type, VS, get_holdoff(), get_timeout()),
+    {ok, St#st{monitor_pid = Pid}}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call({subscribe, JobId, Fun, Pid}, _From, #st{} = St) ->
+    Res = case get_job(St, JobId) of
+        not_found ->
+            not_found;
+        {ok, _, finished} ->
+            finished;
+        {ok, _, JobState} ->
+            Ref = erlang:monitor(process, Pid),
+            ets:insert(St#st.subs, {{JobId, Ref}, {Fun, Pid, JobState}}),
+            {Ref, JobState}
+    end,
+    {reply, Res, St};
+
+handle_call({unsubscribe, Ref}, _From, St) ->
+    true = ets:match_delete(?MODULE, {{'$1', Ref}, '_'}),
+    {reply, ok, St};
+
+% couch_jobs_type_monitor calls this
+handle_call({type_updated, VS}, _From, St) ->
+    ok = notify_subscribers(VS, St),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _, _}, #st{subs = Subs} = St) ->
+    true = ets:match_delete(Subs, {{'$1', Ref}, '_'}),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+get_job(#st{jtx = JTx, type = Type}, JobId) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_job(JTx1, Type, JobId)
+    end).
+
+
+get_jobs(#st{jtx = JTx, type = Type}, JobIds) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        lists:map(fun(JobId) ->
+            case couch_jobs_fdb:get_job(JTx1, Type, JobId) of
+                {ok, _, JobState} -> {JobId, JobState};
+                not_found -> {JobId, not_found}
+            end
+        end, JobIds)
+    end).
+
+
+get_type_vs(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_job_fdb:get_activity_vs(JTx1, Type)
+    end).
+
+
+% "Active since" is the list of jobs that have been active (running)
+% and updated at least once since the given versionstamp. These are relatively
+% cheap to find as it's just a range read in the ?ACTIVITY subspace.
+%
+get_active_since(#st{jtx = JTx, type = Type}, VS, SubscribedJobs) ->
+    AllUpdatedSet = sets:from_list(couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_active_since(JTx1, Type, VS)
+    end)),
+    SubscribedSet = sets:from_list(SubscribedJobs),
+    SubscribedActiveSet = sets:intersection(AllUpdatedSet, SubscribedSet),
+    sets:to_list(SubscribedActiveSet).
+
+
+get_subscribers(JobId, #st{subs = Subs}) ->
+    % Use ordered ets's fast matching of partial key prefixes here
+    lists:map(fun([Ref, {Fun, Pid, JobState}]) ->
+        {Ref, Fun, Pid, JobState}
+    end, ets:match(Subs, {{JobId, '$1'}, '$2'})).
+
+
+get_subscribed_job_ids(#st{subs = Subs}) ->
+    Matches = ets:match(Subs, {{'$1', '_'}, '_'}),
+    lists:usort(lists:flatten(Matches)).
+
+
+notify_subscribers(VS, #st{subs = Subs} = St) ->
+    JobIds = get_subscribed_job_ids(St),
+    % First gather the easy (cheap) active jobs. Then with those out of way,
+    % inspect each job to get its state.
+    Active = get_active_since(St, VS, JobIds),
+    JobStates = [{JobId, running} || JobId <- Active],
+    JobStates1 = JobStates ++ get_jobs(St, JobIds -- Active),
+    lists:foreach(fun({JobId, JobState}) ->
+        lists:foreach(fun
+            ({_Ref, _Fun, _Pid, State}) when State =:= JobState ->
+                ok;
+            ({Ref, Fun, Pid, _}) ->
+                notify(JobId, Ref, Fun, Pid, JobState, St)
+        end, get_subscribers(JobId, St)),
+        case lists:member(JobState, [finished, not_found]) of
+            true -> ets:match_delete(Subs, {{JobId, '_'}, '_'});
+            false -> ok
+        end
+    end, JobStates1).
+
+
+notify(JobId, Ref, Fun, _, JobState, St) when is_function(Fun, 4) ->
+    try
+        Fun(Ref, St#st.type, JobId, JobState)
+    catch
+        Tag:Err ->
+            ErrMsg = "~p : callback ~p failed ~p ~p =>  ~p:~p",
+            couch_log:error(ErrMsg, [?MODULE, Fun, JobId, JobState, Tag, Err])
+    end;
+
+notify(JobId, Ref, _, Pid, JobState, St) ->
+    Pid ! {?COUCH_JOBS_EVENT, Ref, St#st.type, JobId, JobState}.
+
+
+get_holdoff() ->
+    config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+        ?TYPE_MONITOR_HOLDOFF_DEFAULT).
+
+
+get_timeout() ->
+    config:get_integer("couch_jobs", "type_monitor_timeout_msec",
+        ?TYPE_MONITOR_TIMEOUT_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_notifier_sup.erl b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
new file mode 100644
index 0000000..6705b45
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
@@ -0,0 +1,57 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0,
+
+    start_notifier/1,
+    stop_notifier/1
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_notifier(Type) ->
+    supervisor:start_child(?MODULE, [Type]).
+
+
+stop_notifier(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+
+init(_) ->
+    Flags = #{
+        strategy => simple_one_for_one,
+        intensity => 10,
+        period => 3
+    },
+    Children = [
+        #{
+            id => couch_jobs_monitor,
+            restart => temporary,
+            start => {couch_jobs_notifier, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_pending.erl b/src/couch_jobs/src/couch_jobs_pending.erl
new file mode 100644
index 0000000..d015c04
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_pending.erl
@@ -0,0 +1,150 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_pending).
+
+
+-export([
+    enqueue/4,
+    dequeue/3,
+    remove/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Make this configurable or auto-adjustable based on retries
+%
+-define(RANGE_LIMIT, 256).
+
+
+% Data model
+%
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+
+
+% Public API
+
+% Enqueue a job into the pending queue. Priority determines the place in the
+% queue.
+%
+enqueue(#{jtx := true} = JTx, Type, Priority, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    case couch_jobs_fdb:has_versionstamp(Priority) of
+        true ->
+            Key = erlfdb_tuple:pack_vs({?PENDING, Type, Priority, JobId}, Jobs),
+            erlfdb:set_versionstamped_key(Tx, Key, null);
+        false ->
+            Key = erlfdb_tuple:pack({?PENDING, Type, Priority, JobId}, Jobs),
+            erlfdb:set(Tx, Key, null)
+    end.
+
+
+% Dequeue a job from the front of the queue.
+%
+% If MaxPriority is specified, any job between the front of the queue and
+% up until MaxPriority is considered. Workers may randomly pick jobs in that
+% range. That can be used to avoid contention at the expense of strict dequeue
+% ordering. For instance if priorities are 0-high, 1-normal and 2-low, and we wish
+% to process normal and urgent jobs, then MaxPriority=1-normal would
+% accomplish that.
+%
+dequeue(#{jtx := true} = JTx, Type, undefined) ->
+    #{tx := Tx, jobs_path :=  Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+    case get_front_priority(Tx, Prefix) of
+        not_found ->
+            not_found;
+        Priority ->
+            {Start, End} = erlang_tuple:range(Priority, Prefix),
+            case clear_random_key_from_range(Tx, Start, End) of
+                not_found ->
+                    not_found;
+                <<_/binary>> = PendingKey ->
+                    {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+                    JobId
+            end
+    end;
+
+% If MaxPriority is not specified, only jobs with the same priority as the item
+% at the front of the queue are considered. Two extremes are useful to
+% consider:
+%
+%  * Priority is just one static value (say null, or "normal"). In that case,
+% the queue is effectively a bag of tasks that can be grabbed in any order,
+% which should minimize contention.
+%
+%  * Each job has a unique priority value, for example a versionstamp. In that
+%  case, the queue has strict FIFO behavior, but there will be more contention
+%  at the front of the queue.
+%
+dequeue(#{jtx := true} = JTx, Type, MaxPriority) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+    StartKeySel = erlfdb_key:first_greater_than(Prefix),
+    End = erlfdb_tuple:pack({?PENDING, Type, MaxPriority, <<16#FF>>}, Prefix),
+    EndKeySel = erlfdb_key:last_less_than(End),
+    case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of
+        not_found ->
+            not_found;
+        <<_/binary>> = PendingKey ->
+            {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+            JobId
+   end.
+
+
+% Remove a job from the pending queue. This is used, for example, when a job is
+% canceled while it was waiting in the pending queue.
+%
+remove(#{jtx := true} = JTx, Type, Priority, JobId) ->
+    #{tx := Tx, jobs_path := Jobs} = JTx,
+    Key = erlfdb_tuple:pack({?PENDING, Type, Priority, JobId}, Jobs),
+    erlfdb:clear(Tx, Key).
+
+
+% Private functions
+
+% The priority of the item at the front. If there are multiple
+% items with the same priority, workers can randomly pick between them to
+% avoid contention.
+%
+get_front_priority(Tx, Prefix) ->
+    Opts = [{limit, 1}, {snapshot, true}],
+    case erlfdb:wait(erfldb:get_range_startswith(Tx, Prefix, Opts)) of
+        [] ->
+            not_found;
+        [{FrontKey, _}] ->
+            {Priority, _} = erlfdb_tuple:unpack(FrontKey, Prefix),
+            Priority
+    end.
+
+
+% Pick a random key from the range snapshot. Then radomly pick a key to
+% clear. Before clearing, ensure there is a read conflict on the key in
+% in case other workers have picked the same key.
+%
+clear_random_key_from_range(Tx, Start, End) ->
+    Opts = [
+        {limit, ?RANGE_LIMIT},
+        {snapshot, true}
+    ],
+    case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of
+        [] ->
+            not_found;
+        [{_, _} | _] = KVs ->
+            Index = rand:uniform(length(KVs)),
+            {Key, _} = lists:nth(Index),
+            erlfdb:add_read_conflict_key(Tx, Key),
+            erlfdb:clear(Tx, Key),
+            Key
+    end.
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
new file mode 100644
index 0000000..9d533cd
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -0,0 +1,153 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_server).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0,
+    get_notifier_server/1,
+    force_check_types/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(TYPE_CHECK_PERIOD_DEFAULT, 5000).
+-define(MAX_JITTER_DEFAULT, 5000).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+% This is for testing and debugging mostly
+force_check_types() ->
+    get_server:call(?MODULE, force_check_types, infinity).
+
+
+get_notifier_server(Type) ->
+    case get_type_pid_refs(Type) of
+        {{_, _},  {NotifierPid, _}} -> {ok, NotifierPid};
+        not_found -> {error, not_found}
+    end.
+
+
+init(_) ->
+    ets:new(?MODULE, [protected, named_table]),
+    {ok, nil}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(force_check_types, _From, St) ->
+    check_types(),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_types, St) ->
+    check_types(),
+    schedule_check(),
+    {noreply, St};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, St) ->
+    LogMsg = "~p : process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {stop, {unexpected_process_exit, Pid, Reason}, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+check_types() ->
+    FdbTypes = fdb_types(),
+    EtsTypes = ets_types(),
+    ToStart = FdbTypes -- EtsTypes,
+    ToStop = EtsTypes -- FdbTypes,
+    lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart),
+    lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop).
+
+
+start_monitors(Type) ->
+    MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of
+        {ok, Pid1} -> {Pid1, monitor(process, Pid1)};
+        {error, Error1} -> error({failed_to_start_monitor, Type, Error1})
+    end,
+    NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of
+        {ok, Pid2} -> {Pid2, monitor(process, Pid2)};
+        {error, Error2} -> error({failed_to_start_notifier, Type, Error2})
+    end,
+    ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}).
+
+
+stop_monitors(Type) ->
+    {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type),
+    ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid),
+    demonitor(MonRef, [flush]),
+    ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid),
+    demonitor(NotifierRef, [flush]).
+
+
+get_type_pid_refs(Type) ->
+    case ets:lookup(?MODULE, Type) of
+        [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef};
+        [] -> not_found
+    end.
+
+
+ets_types() ->
+    lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})).
+
+
+fdb_types() ->
+     couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        couch_jobs_fdb:get_types(JTx)
+    end).
+
+
+schedule_check() ->
+    Timeout = get_period_msec(),
+    MaxJitter = max(Timeout, get_max_jitter_msec()),
+    Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+    erlang:set_after(Wait, self(), check_types).
+
+
+get_period_msec() ->
+    config:get_integer("couch_jobs", "type_check_period_msec",
+        ?TYPE_CHECK_PERIOD_DEFAULT).
+
+
+get_max_jitter_msec() ->
+    config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+        ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_sup.erl b/src/couch_jobs/src/couch_jobs_sup.erl
new file mode 100644
index 0000000..20d5440
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_sup.erl
@@ -0,0 +1,66 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Flags = #{
+        strategy => rest_for_one,
+        intensity => 1,
+        period => 5
+    },
+    Children = [
+        #{
+            id => couch_jobs_fdb,
+            restart => transient,
+            start => {couch_jobs_fdb, init_cache, []}
+        },
+        #{
+            id => couch_jobs_activity_monitor_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_activity_monitor_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_notifier_sup,
+            restart => permanent,
+            shutdown => brutal_kill,
+            type => supervisor,
+            start => {couch_jobs_notifier_sup, start_link, []}
+        },
+        #{
+            id => couch_jobs_server,
+            restart => permanent,
+            shutdown => brutal_kill,
+            start => {couch_jobs_server, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl
new file mode 100644
index 0000000..84dd8fc
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_type_monitor).
+
+
+-export([
+    start/4,
+    stop/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(st, {
+    jtx,
+    type,
+    vs,
+    parent,
+    timestamp,
+    holdoff,
+    timeout
+}).
+
+
+start(Type, VS, HoldOff, Timeout) ->
+     Parent = self(),
+     spawn_link(fun() ->
+        loop(#st{
+            jtx = couch_jobs_fdb:get_jtx(),
+            type = Type,
+            vs = VS,
+            parent = Parent,
+            timestamp = 0,
+            holdoff = HoldOff,
+            timeout = Timeout
+        })
+    end).
+
+
+stop(Pid) ->
+    Ref = monitor(process, Pid),
+    unlink(Pid),
+    exit(Pid, kill),
+    receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+loop(#st{vs = VS, timeout = Timeout} = St) ->
+    {St1, Watch} = case get_vs_and_watch(St) of
+        {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W};
+        {VS, W} -> {St, W}
+    end,
+    erlfdb:wait(Watch, [{timeout, Timeout}]),
+    loop(St1).
+
+
+notify(#st{} = St) ->
+    #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St,
+    Now = erlang:system_time(millisecond),
+    case Now - Ts of
+        Dt when Dt < HoldOff -> timer:sleep(min(HoldOff - Dt, 0));
+        _ -> ok
+    end,
+    gen_server:call(Pid, {type_updated, VS}, infinity),
+    St#st{timestamp = Now}.
+
+
+get_vs_and_watch(#st{jtx = JTx, type = Type}) ->
+    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+        couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
+    end).