You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/05/06 16:51:03 UTC

[couchdb] 01/01: Couch workers prototype

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/rfc-couch-workers
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 087cb0e8afd201d1c4a0cc3e324043a2f31f3ea5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon May 6 12:49:22 2019 -0400

    Couch workers prototype
---
 rebar.config.script                            |   1 +
 rel/reltool.config                             |   2 +
 src/couch_workers/.gitignore                   |   4 +
 src/couch_workers/LICENSE                      | 202 ++++++++++++++++++++++
 src/couch_workers/README.md                    |   4 +
 src/couch_workers/src/couch_workers.app.src    |  30 ++++
 src/couch_workers/src/couch_workers.erl        |  54 ++++++
 src/couch_workers/src/couch_workers_app.erl    |  26 +++
 src/couch_workers/src/couch_workers_fdb.erl    | 140 +++++++++++++++
 src/couch_workers/src/couch_workers_global.erl | 228 +++++++++++++++++++++++++
 src/couch_workers/src/couch_workers_local.erl  | 155 +++++++++++++++++
 src/couch_workers/src/couch_workers_sup.erl    |  49 ++++++
 12 files changed, 895 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index 3b58bcb..0069597 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -76,6 +76,7 @@ SubDirs = [
     "src/couch_tests",
     "src/ddoc_cache",
     "src/fabric",
+    "src/couch_workers",
     "src/global_changes",
     "src/mango",
     "src/rexi",
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e..ffb795a 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -34,6 +34,7 @@
         couch,
         couch_epi,
         couch_index,
+        couch_workers,
         couch_log,
         couch_mrview,
         couch_plugins,
@@ -90,6 +91,7 @@
     {app, config, [{incl_cond, include}]},
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
+    {app, couch_workers, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
     {app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_workers/.gitignore b/src/couch_workers/.gitignore
new file mode 100644
index 0000000..62c8b07
--- /dev/null
+++ b/src/couch_workers/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_workers.app
+.DS_Store
\ No newline at end of file
diff --git a/src/couch_workers/LICENSE b/src/couch_workers/LICENSE
new file mode 100644
index 0000000..f6cd2bc
--- /dev/null
+++ b/src/couch_workers/LICENSE
@@ -0,0 +1,202 @@
+
+                                Apache License
+                          Version 2.0, January 2004
+                       http://www.apache.org/licenses/
+
+  TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+  1. Definitions.
+
+     "License" shall mean the terms and conditions for use, reproduction,
+     and distribution as defined by Sections 1 through 9 of this document.
+
+     "Licensor" shall mean the copyright owner or entity authorized by
+     the copyright owner that is granting the License.
+
+     "Legal Entity" shall mean the union of the acting entity and all
+     other entities that control, are controlled by, or are under common
+     control with that entity. For the purposes of this definition,
+     "control" means (i) the power, direct or indirect, to cause the
+     direction or management of such entity, whether by contract or
+     otherwise, or (ii) ownership of fifty percent (50%) or more of the
+     outstanding shares, or (iii) beneficial ownership of such entity.
+
+     "You" (or "Your") shall mean an individual or Legal Entity
+     exercising permissions granted by this License.
+
+     "Source" form shall mean the preferred form for making modifications,
+     including but not limited to software source code, documentation
+     source, and configuration files.
+
+     "Object" form shall mean any form resulting from mechanical
+     transformation or translation of a Source form, including but
+     not limited to compiled object code, generated documentation,
+     and conversions to other media types.
+
+     "Work" shall mean the work of authorship, whether in Source or
+     Object form, made available under the License, as indicated by a
+     copyright notice that is included in or attached to the work
+     (an example is provided in the Appendix below).
+
+     "Derivative Works" shall mean any work, whether in Source or Object
+     form, that is based on (or derived from) the Work and for which the
+     editorial revisions, annotations, elaborations, or other modifications
+     represent, as a whole, an original work of authorship. For the purposes
+     of this License, Derivative Works shall not include works that remain
+     separable from, or merely link (or bind by name) to the interfaces of,
+     the Work and Derivative Works thereof.
+
+     "Contribution" shall mean any work of authorship, including
+     the original version of the Work and any modifications or additions
+     to that Work or Derivative Works thereof, that is intentionally
+     submitted to Licensor for inclusion in the Work by the copyright owner
+     or by an individual or Legal Entity authorized to submit on behalf of
+     the copyright owner. For the purposes of this definition, "submitted"
+     means any form of electronic, verbal, or written communication sent
+     to the Licensor or its representatives, including but not limited to
+     communication on electronic mailing lists, source code control systems,
+     and issue tracking systems that are managed by, or on behalf of, the
+     Licensor for the purpose of discussing and improving the Work, but
+     excluding communication that is conspicuously marked or otherwise
+     designated in writing by the copyright owner as "Not a Contribution."
+
+     "Contributor" shall mean Licensor and any individual or Legal Entity
+     on behalf of whom a Contribution has been received by Licensor and
+     subsequently incorporated within the Work.
+
+  2. Grant of Copyright License. Subject to the terms and conditions of
+     this License, each Contributor hereby grants to You a perpetual,
+     worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+     copyright license to reproduce, prepare Derivative Works of,
+     publicly display, publicly perform, sublicense, and distribute the
+     Work and such Derivative Works in Source or Object form.
+
+  3. Grant of Patent License. Subject to the terms and conditions of
+     this License, each Contributor hereby grants to You a perpetual,
+     worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+     (except as stated in this section) patent license to make, have made,
+     use, offer to sell, sell, import, and otherwise transfer the Work,
+     where such license applies only to those patent claims licensable
+     by such Contributor that are necessarily infringed by their
+     Contribution(s) alone or by combination of their Contribution(s)
+     with the Work to which such Contribution(s) was submitted. If You
+     institute patent litigation against any entity (including a
+     cross-claim or counterclaim in a lawsuit) alleging that the Work
+     or a Contribution incorporated within the Work constitutes direct
+     or contributory patent infringement, then any patent licenses
+     granted to You under this License for that Work shall terminate
+     as of the date such litigation is filed.
+
+  4. Redistribution. You may reproduce and distribute copies of the
+     Work or Derivative Works thereof in any medium, with or without
+     modifications, and in Source or Object form, provided that You
+     meet the following conditions:
+
+     (a) You must give any other recipients of the Work or
+         Derivative Works a copy of this License; and
+
+     (b) You must cause any modified files to carry prominent notices
+         stating that You changed the files; and
+
+     (c) You must retain, in the Source form of any Derivative Works
+         that You distribute, all copyright, patent, trademark, and
+         attribution notices from the Source form of the Work,
+         excluding those notices that do not pertain to any part of
+         the Derivative Works; and
+
+     (d) If the Work includes a "NOTICE" text file as part of its
+         distribution, then any Derivative Works that You distribute must
+         include a readable copy of the attribution notices contained
+         within such NOTICE file, excluding those notices that do not
+         pertain to any part of the Derivative Works, in at least one
+         of the following places: within a NOTICE text file distributed
+         as part of the Derivative Works; within the Source form or
+         documentation, if provided along with the Derivative Works; or,
+         within a display generated by the Derivative Works, if and
+         wherever such third-party notices normally appear. The contents
+         of the NOTICE file are for informational purposes only and
+         do not modify the License. You may add Your own attribution
+         notices within Derivative Works that You distribute, alongside
+         or as an addendum to the NOTICE text from the Work, provided
+         that such additional attribution notices cannot be construed
+         as modifying the License.
+
+     You may add Your own copyright statement to Your modifications and
+     may provide additional or different license terms and conditions
+     for use, reproduction, or distribution of Your modifications, or
+     for any such Derivative Works as a whole, provided Your use,
+     reproduction, and distribution of the Work otherwise complies with
+     the conditions stated in this License.
+
+  5. Submission of Contributions. Unless You explicitly state otherwise,
+     any Contribution intentionally submitted for inclusion in the Work
+     by You to the Licensor shall be under the terms and conditions of
+     this License, without any additional terms or conditions.
+     Notwithstanding the above, nothing herein shall supersede or modify
+     the terms of any separate license agreement you may have executed
+     with Licensor regarding such Contributions.
+
+  6. Trademarks. This License does not grant permission to use the trade
+     names, trademarks, service marks, or product names of the Licensor,
+     except as required for reasonable and customary use in describing the
+     origin of the Work and reproducing the content of the NOTICE file.
+
+  7. Disclaimer of Warranty. Unless required by applicable law or
+     agreed to in writing, Licensor provides the Work (and each
+     Contributor provides its Contributions) on an "AS IS" BASIS,
+     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+     implied, including, without limitation, any warranties or conditions
+     of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+     PARTICULAR PURPOSE. You are solely responsible for determining the
+     appropriateness of using or redistributing the Work and assume any
+     risks associated with Your exercise of permissions under this License.
+
+  8. Limitation of Liability. In no event and under no legal theory,
+     whether in tort (including negligence), contract, or otherwise,
+     unless required by applicable law (such as deliberate and grossly
+     negligent acts) or agreed to in writing, shall any Contributor be
+     liable to You for damages, including any direct, indirect, special,
+     incidental, or consequential damages of any character arising as a
+     result of this License or out of the use or inability to use the
+     Work (including but not limited to damages for loss of goodwill,
+     work stoppage, computer failure or malfunction, or any and all
+     other commercial damages or losses), even if such Contributor
+     has been advised of the possibility of such damages.
+
+  9. Accepting Warranty or Additional Liability. While redistributing
+     the Work or Derivative Works thereof, You may choose to offer,
+     and charge a fee for, acceptance of support, warranty, indemnity,
+     or other liability obligations and/or rights consistent with this
+     License. However, in accepting such obligations, You may act only
+     on Your own behalf and on Your sole responsibility, not on behalf
+     of any other Contributor, and only if You agree to indemnify,
+     defend, and hold each Contributor harmless for any liability
+     incurred by, or claims asserted against, such Contributor by reason
+     of your accepting any such warranty or additional liability.
+
+  END OF TERMS AND CONDITIONS
+
+  APPENDIX: How to apply the Apache License to your work.
+
+     To apply the Apache License to your work, attach the following
+     boilerplate notice, with the fields enclosed by brackets "[]"
+     replaced with your own identifying information. (Don't include
+     the brackets!)  The text should be enclosed in the appropriate
+     comment syntax for the file format. We also recommend that a
+     file or class name and description of purpose be included on the
+     same "printed page" as the copyright notice for easier
+     identification within third-party archives.
+
+  Copyright [yyyy] [name of copyright owner]
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
diff --git a/src/couch_workers/README.md b/src/couch_workers/README.md
new file mode 100644
index 0000000..8cfcd34
--- /dev/null
+++ b/src/couch_workers/README.md
@@ -0,0 +1,4 @@
+Couch Workers Application
+=========================
+
+Couch workers perform background jobs for CouchDB running on top of FDB
diff --git a/src/couch_workers/src/couch_workers.app.src b/src/couch_workers/src/couch_workers.app.src
new file mode 100644
index 0000000..7db57d6
--- /dev/null
+++ b/src/couch_workers/src/couch_workers.app.src
@@ -0,0 +1,30 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_workers, [
+    {description, "CouchDB Workers"},
+    {vsn, git},
+    {mod, {couch_workers_app, []}},
+    {registered, [
+        couch_workers_sup,
+        couch_workers_global,
+        couch_workers_local
+    ]},
+    {applications, [
+        kernel,
+        stdlib,
+        erlfdb,
+        couch_log,
+        config,
+        fabric
+    ]}
+]}.
diff --git a/src/couch_workers/src/couch_workers.erl b/src/couch_workers/src/couch_workers.erl
new file mode 100644
index 0000000..83f6996
--- /dev/null
+++ b/src/couch_workers/src/couch_workers.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers).
+
+-export([
+   worker_register/4,
+   worker_unregister/1,
+
+   membership_subscribe/3,
+   membership_unsubscribe/1,
+
+   get_workers/1
+]).
+
+
+-callback couch_workers_membership_update(
+    Role :: term(),
+    Workers :: #{},
+    VStamp :: binary(),
+    SubscriberRef :: reference()
+) -> ok.
+
+
+% External API
+
+worker_register(Role, Id, Opts, Pid)  when is_binary(Id), is_map(Opts),
+        is_pid(Pid) ->
+    couch_workers_local:worker_register(Role, Id, Pid).
+
+
+worker_unregister(Ref) when is_reference(Ref) ->
+    couch_workers_local:worker_unregister(Ref).
+
+
+membership_subscribe(Role, Module, Pid) ->
+    couch_workers_global:subscribe(Role, Module, Pid).
+
+
+membership_unsubscribe(Ref) when is_reference(Ref) ->
+    couch_workers_global:unsubscribe(Ref).
+
+
+get_workers(Role) ->
+    couch_workers_global:get_workers(Role).
diff --git a/src/couch_workers/src/couch_workers_app.erl b/src/couch_workers/src/couch_workers_app.erl
new file mode 100644
index 0000000..734dc1b
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_app.erl
@@ -0,0 +1,26 @@
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_Type, []) ->
+    couch_workers_sup:start_link().
+
+
+stop([]) ->
+    ok.
diff --git a/src/couch_workers/src/couch_workers_fdb.erl b/src/couch_workers/src/couch_workers_fdb.erl
new file mode 100644
index 0000000..b2c1933
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_fdb.erl
@@ -0,0 +1,140 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_fdb).
+
+-export([
+    set_worker/4,
+    clear_worker/3,
+    get_worker/3,
+
+    set_worker_health/5,
+    get_worker_health/3,
+    get_workers_health/2,
+
+    set_workers_vs/2,
+    get_workers_vs/2,
+
+    get_workers/2
+]).
+
+
+%% Switch these to numbers eventually
+-define(COUCH_WORKERS, <<"couch_workers">>).
+-define(WORKERS, <<"workers">>).
+-define(WORKERS_VS, <<"workers_vs">>).
+-define(HEALTH, <<"health">>).
+
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+
+%% (?COUCH_WORKERS, Role, ?WORKERS_VS) = VS
+%% (?COUCH_WORKERS, Role, ?WORKERS, Worker) = Opts
+%% (?COUCH_WORKERS, Role, ?HEALTH, Worker) = (VS, TStamp, Timeout)
+
+
+get_worker(Tx, Role, Worker) ->
+    Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, Role)),
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        <<_/binary>> = Val ->
+            binary_to_term(Val, [safe]);
+        not_found ->
+            not_found
+    end.
+
+
+set_worker(Tx, Role, Worker, Opts) ->
+    Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, Role)),
+    case get_worker(Tx, Role, Worker) of
+        not_found ->
+            set_workers_vs(Tx, Role);
+        #{} ->
+            ok
+    end,
+    erlfdb:wait(erlfdb:set(Tx, Key, term_to_binary(Opts))).
+
+
+clear_worker(Tx, Role, Worker) ->
+    Prefix = workers_prefix(Tx, Role),
+    Key = erlfdb_tuple:pack({?WORKERS, Worker}, Prefix),
+    case get_worker(Tx, Role, Worker) of
+        not_found ->
+            ok;
+        #{} ->
+            WPrefix = erlfdb_tuple:pack({?WORKERS, Worker}, Prefix),
+            erlfdb:clear_range_startswith(Tx, WPrefix),
+            HPrefix = erlfdb_tuple:pack({?HEALTH, Worker}, Prefix),
+            erlfdb:clear_range_startswith(Tx, HPrefix),
+            set_workers_vs(Tx, Role),
+            ok
+    end.
+
+
+get_worker_health(Tx, Role, Worker) ->
+    Prefix = workers_prefix(Tx, Role),
+    Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, Role)),
+    Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+    {VS, TStamp, Timeout} = erlfdb_tuple:unpack(Val),
+    {VS, TStamp, Timeout}.
+
+
+get_workers_health(Tx, Role) ->
+    Prefix = workers_prefix(Tx, Role),
+    {Start, End} = erlfdb_tuple:range({?HEALTH}, Prefix),
+    RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)),
+    KVs = lists:map(fun({K, V}) ->
+        {?HEALTH, Worker} = erlfdb_tuple:unpack(K, Prefix),
+        Opts = binary_to_term(V, [safe]),
+        {Worker, Opts}
+    end, RawKVs),
+    maps:from_list(KVs).
+
+
+set_worker_health(Tx, Role, Worker, TStamp, Timeout) when
+        is_integer(TStamp), is_integer(Timeout) ->
+    Prefix = workers_prefix(Tx, Role),
+    Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, Role)),
+    Val = erlfdb_tuple:pack({?UNSET_VS, TStamp, Timeout}),
+    erlfdb:wait(erlfdb:set(Tx, Key, Val)).
+
+
+get_workers_vs(Tx, Role) ->
+    % return a watch here eventually
+    Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, Role)),
+    erlfdb:wait(erlfdb:get(Tx, Key)).
+
+
+set_workers_vs(Tx, Role) ->
+    % return a watch here eventually
+    Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, Role)),
+    Val = erlfdb_tuple:pack({?UNSET_VS}),
+    erlfdb:wait(erlfdb:set(Tx, Key, Val)).
+
+
+get_workers(Tx, Role) ->
+    Prefix = workers_prefix(Tx, Role),
+    {Start, End} = erlfdb_tuple:range({?WORKERS}, Prefix),
+    RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)),
+    KVs = lists:map(fun({K, V}) ->
+        {?WORKERS, Worker} = erlfdb_tuple:unpack(K, Prefix),
+        Opts = binary_to_term(V, [safe]),
+        {Worker, Opts}
+    end, RawKVs),
+    maps:from_list(KVs).
+
+
+workers_prefix(Tx, Role) ->
+    Root = erlfdb_directory:root(),
+    CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+    Prefix = erlfdb_directory:get_name(CouchDB),
+    erlfdb_tuple:pack({?COUCH_WORKERS, Role}, Prefix).
diff --git a/src/couch_workers/src/couch_workers_global.erl b/src/couch_workers/src/couch_workers_global.erl
new file mode 100644
index 0000000..efda3f0
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_global.erl
@@ -0,0 +1,228 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_global).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0,
+    subscribe/3,
+    unsubscribe/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(ROLE_MONITOR_POLL_INTERVAL_MSEC, 5000).
+
+-define(CACHE, couch_workers_global_cache).
+-define(SUBSCRIBERS, couch_workers_global_subscribers).
+-define(MONITOR_PIDS, couch_workers_global_monitor_pids).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+subscribe(Role, Module, Pid) ->
+    gen_server:call(?MODULE, {subscribe, Role, Module, Pid}, infinity).
+
+
+unsubscribe(Ref) ->
+    gen_server:call(?MODULE, {unsubscribe, Ref}, infinity).
+
+
+get_workers(Role) ->
+    case ets:lookup(?CACHE, Role) of
+        [{Role, VS, Workers}] ->
+            {ok, VS, Workers};
+        [] ->
+            {error, not_found}
+    end.
+
+
+%% gen_server callbacks
+
+init(_) ->
+    EtsOpts = [protected, named_table],
+    % {Role, VS, Workers}
+    ets:new(?CACHE, EtsOpts ++ [{read_concurrency, true}]),
+    % {{Role, Ref}, Module, VS}
+    ets:new(?SUBSCRIBERS, EtsOpts ++ [ordered_set]),
+    % {Role, Pid}
+    ets:new(?MONITOR_PIDS, EtsOpts),
+    {ok, nil}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call({subscribe, Role, Mod, Pid}, From, St) ->
+    Ref = erlang:monitor(process, Pid),
+    subscribe_int(Role, Ref, Mod),
+    gen_server:reply(From, Ref),
+    case get_workers(Role) of
+        {ok, VS, Workers} ->
+            do_callback(Mod, Role, Workers, VS, Ref);
+        {error, not_found} ->
+            ok
+    end,
+    {noreply, St};
+
+handle_call({unsubscribe, Ref}, _From, St) ->
+    unsubscribe_int(Ref),
+    {reply, ok, St};
+
+handle_call({membership_update, Role, VS, Workers}, _From, St) ->
+    membership_update(Role, VS, Workers),
+    {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+    unsubscribe_int(Ref),
+    {reply, ok, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+%% Utility functions
+
+membership_update(Role, VS, Workers) ->
+    true = ets:insert(?CACHE, {Role, VS, Workers}),
+    lists:foreach(fun
+        ({_Ref, _Mod, VS}) ->  ok;
+        ({Ref, Mod, _OldVs}) -> do_callback(Mod, Role, Workers, VS, Ref)
+    end, find_subscribers(Role)).
+
+
+do_callback(nil, _, _, _, _) ->
+    % User didn't want a callback, they'll be polling the cache
+    ok;
+
+do_callback(Mod, Role, Workers, VS, Ref) ->
+    try
+        Mod:couch_workers_membership_update(Role, Workers, VS, Ref)
+    catch
+        Tag:Err ->
+            ErrMsg = "~p : failed when calling callback Mod:~p Role:~p ~p:~p",
+            couch_log:error(ErrMsg, [?MODULE, Mod, Role, Tag, Err])
+    end.
+
+
+subscribe_int(Role, Ref, Module) ->
+     true = ets:insert(?SUBSCRIBERS, {{Role, Ref}, Module, nil}),
+     maybe_start_role_monitor(Role).
+
+
+unsubscribe_int(Ref) ->
+    case find_subscriber_role(Ref) of
+        {ok, Role} ->
+            true = ets:delete(?SUBSCRIBERS, {Role, Ref}),
+            case find_subscribers(Role) of
+                [] ->
+                    true = ets:delete(?CACHE, Role),
+                    stop_role_monitor(Role);
+                [_] ->
+                    ok
+            end;
+        {error, not_found} ->
+            ok
+    end.
+
+
+find_subscriber_role(Ref) ->
+    case ets:match(?SUBSCRIBERS, {{'$1', Ref}, '_'}) of
+        [] -> {error, not_found};
+        [[Role]] -> {ok, Role}
+    end.
+
+
+find_subscribers(Role) ->
+    % Using ETS partial key match in ordered set
+    Subscribers = ets:match(?SUBSCRIBERS, {{Role, '$1'}, '$2', '$3'}),
+    lists:map(fun([Ref, Mod, VS]) -> {Ref, Mod, VS} end, Subscribers).
+
+
+maybe_start_role_monitor(Role) ->
+    case ets:lookup(?MONITOR_PIDS, Role) of
+        [{_Role, _Pid}] ->
+            false;
+        [] ->
+            PollMSec = ?ROLE_MONITOR_POLL_INTERVAL_MSEC,
+            Self = self(),
+            Pid = spawn_link(fun() ->
+                 role_monitor_loop(Role, nil, Self, PollMSec)
+            end),
+            true = ets:insert(?MONITOR_PIDS, {Role, Pid})
+    end.
+
+
+stop_role_monitor(Role) ->
+    [{_, Pid}] = ets:lookup(?MONITOR_PIDS, Role),
+    true = ets:delete(?MONITOR_PIDS, Role),
+    Ref = monitor(process, Pid),
+    unlink(Pid),
+    exit(Pid, kill),
+    receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+
+% Replace with a watch eventually but with a polling fallback
+% in case we make too many watches
+role_monitor_loop(Role, VS, ReportPid, PollMSec) ->
+    NewVS = case get_workers_vs(Role) of
+        VS ->
+            VS;
+        OtherVS when is_binary(OtherVS) ->
+            {VS1, Workers} = get_workers_and_vs(Role),
+            CallMsg = {membership_updated, Role, VS1, Workers},
+            ok = gen_server:call(ReportPid, CallMsg, infinity),
+            VS1
+    end,
+    timer:sleep(PollMSec),
+    role_monitor_loop(Role, NewVS, ReportPid, PollMSec).
+
+
+get_workers_vs(Role) ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        couch_workers_fdb:get_workers_vs(Tx, Role)
+    end).
+
+
+get_workers_and_vs(Role) ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        VS = couch_workers_fdb:get_workers(Tx, Role),
+        Workers = couch_workers_fdb:get_workers(Tx, Role),
+        {VS, Workers}
+    end).
diff --git a/src/couch_workers/src/couch_workers_local.erl b/src/couch_workers/src/couch_workers_local.erl
new file mode 100644
index 0000000..24cd2fd
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_local.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_local).
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0,
+    worker_register/4,
+    worker_unregister/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(DEFAULT_HEALTH_TIMEOUT_SEC, 15).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+worker_register(Role, Id, Opts, Pid) ->
+    gen_server:call(?MODULE, {worker_register, Role, Id, Opts, Pid}, infinity).
+
+
+worker_unregister(Ref) ->
+    get_server:call(?MODULE, {worker_unregsiter, Ref}, infinity).
+
+
+init(_) ->
+    % {Ref, Role, Id, HealthPid}
+    ets:new(?MODULE, [protected, named_table]),
+    {ok, nil}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call({worker_register, Role, Id, Opts, Pid}, _From, St) ->
+    worker_register_int(Role, Id, Opts, Pid),
+    {noreply, St};
+
+handle_call({worker_unregister, Ref}, _From, St) ->
+    worker_unregister_int(Ref),
+    {reply, ok, St};
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+    worker_unregister(Ref),
+    {reply, ok, St};
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+%% Utility functions
+
+worker_register_int(Role, Id, Pid, Opts) ->
+    case ets:match(?MODULE, {'$1', Role, Id, '_'}) of
+        [] ->
+            Ref = erlang:monitor(process, Pid),
+            ok = set_worker(Role, Id, Opts),
+            Timeout = maps:get(timeout, Opts, ?DEFAULT_HEALTH_TIMEOUT_SEC),
+            HPid = spawn_link(fun() ->
+                health_pinger_loop(Role, Id, Timeout)
+            end),
+            true = ets:insert(?MODULE, {Ref, Role, Id, HPid}),
+            Ref;
+        [[Ref]] ->
+            Ref
+    end.
+
+
+worker_unregister_int(Ref) ->
+    case ets:lookup(?MODULE, Ref) of
+        [{_, Role, Id, HealthPid}] ->
+            ok = clear_worker(Role, Id),
+            kill_health_pinger(HealthPid),
+            true = ets:delete(?MODULE, Ref),
+            ok;
+        [] ->
+            couch_log:error("~p : unknown worker reference ~p", [?MODULE, Ref]),
+            ok
+    end.
+
+
+now_sec() ->
+    {Mega, Sec, _Micro} = os:timestamp(),
+    Mega * 1000000 + Sec.
+
+
+kill_health_pinger(Pid) when is_pid(Pid) ->
+    Ref = monitor(process, Pid),
+    unlink(Pid),
+    exit(Pid, kill),
+    receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+health_pinger_loop(Role, Id, Timeout) ->
+    set_worker_health(Role, Id, now_sec(), Timeout),
+    % todo: dd jitter here
+    timer:sleep(max(10, Timeout * 1000 / 3)),
+    health_pinger_loop(Role, Id, Timeout).
+
+
+set_worker_health(Role, Worker, TStamp, Timeout) ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        couch_workers_fdb:set_worker_health(Tx, Role, Worker, TStamp, Timeout)
+    end).
+
+
+set_worker(Role, Worker, Opts) ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        couch_workers_fdb:set_worker(Tx, Role, Worker, Opts)
+    end),
+    ok.
+
+
+clear_worker(Role, Worker) ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        couch_workers_fdb:clear_worker(Tx, Role, Worker)
+    end),
+    ok.
diff --git a/src/couch_workers/src/couch_workers_sup.erl b/src/couch_workers/src/couch_workers_sup.erl
new file mode 100644
index 0000000..15a0a3b
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_sup.erl
@@ -0,0 +1,49 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Flags = #{
+        strategy => one_for_one,
+        intensity => 1,
+        period => 5
+    },
+    Children = [
+        #{
+            id => couch_workers_server,
+            start => {couch_workers_global, start_link, []}
+        },
+        #{
+            id => couch_workers_server,
+            start => {couch_workers_local, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.