You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/05/06 16:51:03 UTC
[couchdb] 01/01: Couch workers prototype
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch prototype/rfc-couch-workers
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 087cb0e8afd201d1c4a0cc3e324043a2f31f3ea5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon May 6 12:49:22 2019 -0400
Couch workers prototype
---
rebar.config.script | 1 +
rel/reltool.config | 2 +
src/couch_workers/.gitignore | 4 +
src/couch_workers/LICENSE | 202 ++++++++++++++++++++++
src/couch_workers/README.md | 4 +
src/couch_workers/src/couch_workers.app.src | 30 ++++
src/couch_workers/src/couch_workers.erl | 54 ++++++
src/couch_workers/src/couch_workers_app.erl | 26 +++
src/couch_workers/src/couch_workers_fdb.erl | 140 +++++++++++++++
src/couch_workers/src/couch_workers_global.erl | 228 +++++++++++++++++++++++++
src/couch_workers/src/couch_workers_local.erl | 155 +++++++++++++++++
src/couch_workers/src/couch_workers_sup.erl | 49 ++++++
12 files changed, 895 insertions(+)
diff --git a/rebar.config.script b/rebar.config.script
index 3b58bcb..0069597 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -76,6 +76,7 @@ SubDirs = [
"src/couch_tests",
"src/ddoc_cache",
"src/fabric",
+ "src/couch_workers",
"src/global_changes",
"src/mango",
"src/rexi",
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e..ffb795a 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -34,6 +34,7 @@
couch,
couch_epi,
couch_index,
+ couch_workers,
couch_log,
couch_mrview,
couch_plugins,
@@ -90,6 +91,7 @@
{app, config, [{incl_cond, include}]},
{app, couch, [{incl_cond, include}]},
{app, couch_epi, [{incl_cond, include}]},
+ {app, couch_workers, [{incl_cond, include}]},
{app, couch_index, [{incl_cond, include}]},
{app, couch_log, [{incl_cond, include}]},
{app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_workers/.gitignore b/src/couch_workers/.gitignore
new file mode 100644
index 0000000..62c8b07
--- /dev/null
+++ b/src/couch_workers/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_workers.app
+.DS_Store
\ No newline at end of file
diff --git a/src/couch_workers/LICENSE b/src/couch_workers/LICENSE
new file mode 100644
index 0000000..f6cd2bc
--- /dev/null
+++ b/src/couch_workers/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/src/couch_workers/README.md b/src/couch_workers/README.md
new file mode 100644
index 0000000..8cfcd34
--- /dev/null
+++ b/src/couch_workers/README.md
@@ -0,0 +1,4 @@
+Couch Workers Application
+=========================
+
+Couch workers perform background jobs for CouchDB running on top of FDB
diff --git a/src/couch_workers/src/couch_workers.app.src b/src/couch_workers/src/couch_workers.app.src
new file mode 100644
index 0000000..7db57d6
--- /dev/null
+++ b/src/couch_workers/src/couch_workers.app.src
@@ -0,0 +1,30 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_workers, [
+ {description, "CouchDB Workers"},
+ {vsn, git},
+ {mod, {couch_workers_app, []}},
+ {registered, [
+ couch_workers_sup,
+ couch_workers_global,
+ couch_workers_local
+ ]},
+ {applications, [
+ kernel,
+ stdlib,
+ erlfdb,
+ couch_log,
+ config,
+ fabric
+ ]}
+]}.
diff --git a/src/couch_workers/src/couch_workers.erl b/src/couch_workers/src/couch_workers.erl
new file mode 100644
index 0000000..83f6996
--- /dev/null
+++ b/src/couch_workers/src/couch_workers.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers).
+
+-export([
+ worker_register/4,
+ worker_unregister/1,
+
+ membership_subscribe/3,
+ membership_unsubscribe/1,
+
+ get_workers/1
+]).
+
+
+-callback couch_workers_membership_update(
+ Role :: term(),
+ Workers :: #{},
+ VStamp :: binary(),
+ SubscriberRef :: reference()
+) -> ok.
+
+
+% External API
+
+worker_register(Role, Id, Opts, Pid) when is_binary(Id), is_map(Opts),
+ is_pid(Pid) ->
+ couch_workers_local:worker_register(Role, Id, Pid).
+
+
+worker_unregister(Ref) when is_reference(Ref) ->
+ couch_workers_local:worker_unregister(Ref).
+
+
+membership_subscribe(Role, Module, Pid) ->
+ couch_workers_global:subscribe(Role, Module, Pid).
+
+
+membership_unsubscribe(Ref) when is_reference(Ref) ->
+ couch_workers_global:unsubscribe(Ref).
+
+
+get_workers(Role) ->
+ couch_workers_global:get_workers(Role).
diff --git a/src/couch_workers/src/couch_workers_app.erl b/src/couch_workers/src/couch_workers_app.erl
new file mode 100644
index 0000000..734dc1b
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_app.erl
@@ -0,0 +1,26 @@
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_app).
+
+
+-behaviour(application).
+
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_Type, []) ->
+ couch_workers_sup:start_link().
+
+
+stop([]) ->
+ ok.
diff --git a/src/couch_workers/src/couch_workers_fdb.erl b/src/couch_workers/src/couch_workers_fdb.erl
new file mode 100644
index 0000000..b2c1933
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_fdb.erl
@@ -0,0 +1,140 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_fdb).
+
+-export([
+ set_worker/4,
+ clear_worker/3,
+ get_worker/3,
+
+ set_worker_health/5,
+ get_worker_health/3,
+ get_workers_health/2,
+
+ set_workers_vs/2,
+ get_workers_vs/2,
+
+ get_workers/2
+]).
+
+
+%% Switch these to numbers eventually
+-define(COUCH_WORKERS, <<"couch_workers">>).
+-define(WORKERS, <<"workers">>).
+-define(WORKERS_VS, <<"workers_vs">>).
+-define(HEALTH, <<"health">>).
+
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+
+%% (?COUCH_WORKERS, Role, ?WORKERS_VS) = VS
+%% (?COUCH_WORKERS, Role, ?WORKERS, Worker) = Opts
+%% (?COUCH_WORKERS, Role, ?HEALTH, Worker) = (VS, TStamp, Timeout)
+
+
+get_worker(Tx, Role, Worker) ->
+ Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, Role)),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> = Val ->
+ binary_to_term(Val, [safe]);
+ not_found ->
+ not_found
+ end.
+
+
+set_worker(Tx, Role, Worker, Opts) ->
+ Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, Role)),
+ case get_worker(Tx, Role, Worker) of
+ not_found ->
+ set_workers_vs(Tx, Role);
+ #{} ->
+ ok
+ end,
+ erlfdb:wait(erlfdb:set(Tx, Key, term_to_binary(Opts))).
+
+
+clear_worker(Tx, Role, Worker) ->
+ Prefix = workers_prefix(Tx, Role),
+ Key = erlfdb_tuple:pack({?WORKERS, Worker}, Prefix),
+ case get_worker(Tx, Role, Worker) of
+ not_found ->
+ ok;
+ #{} ->
+ WPrefix = erlfdb_tuple:pack({?WORKERS, Worker}, Prefix),
+ erlfdb:clear_range_startswith(Tx, WPrefix),
+ HPrefix = erlfdb_tuple:pack({?HEALTH, Worker}, Prefix),
+ erlfdb:clear_range_startswith(Tx, HPrefix),
+ set_workers_vs(Tx, Role),
+ ok
+ end.
+
+
+get_worker_health(Tx, Role, Worker) ->
+ Prefix = workers_prefix(Tx, Role),
+ Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, Role)),
+ Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+ {VS, TStamp, Timeout} = erlfdb_tuple:unpack(Val),
+ {VS, TStamp, Timeout}.
+
+
+get_workers_health(Tx, Role) ->
+ Prefix = workers_prefix(Tx, Role),
+ {Start, End} = erlfdb_tuple:range({?HEALTH}, Prefix),
+ RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)),
+ KVs = lists:map(fun({K, V}) ->
+ {?HEALTH, Worker} = erlfdb_tuple:unpack(K, Prefix),
+ Opts = binary_to_term(V, [safe]),
+ {Worker, Opts}
+ end, RawKVs),
+ maps:from_list(KVs).
+
+
+set_worker_health(Tx, Role, Worker, TStamp, Timeout) when
+ is_integer(TStamp), is_integer(Timeout) ->
+ Prefix = workers_prefix(Tx, Role),
+ Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, Role)),
+ Val = erlfdb_tuple:pack({?UNSET_VS, TStamp, Timeout}),
+ erlfdb:wait(erlfdb:set(Tx, Key, Val)).
+
+
+get_workers_vs(Tx, Role) ->
+ % return a watch here eventually
+ Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, Role)),
+ erlfdb:wait(erlfdb:get(Tx, Key)).
+
+
+set_workers_vs(Tx, Role) ->
+ % return a watch here eventually
+ Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, Role)),
+ Val = erlfdb_tuple:pack({?UNSET_VS}),
+ erlfdb:wait(erlfdb:set(Tx, Key, Val)).
+
+
+get_workers(Tx, Role) ->
+ Prefix = workers_prefix(Tx, Role),
+ {Start, End} = erlfdb_tuple:range({?WORKERS}, Prefix),
+ RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)),
+ KVs = lists:map(fun({K, V}) ->
+ {?WORKERS, Worker} = erlfdb_tuple:unpack(K, Prefix),
+ Opts = binary_to_term(V, [safe]),
+ {Worker, Opts}
+ end, RawKVs),
+ maps:from_list(KVs).
+
+
+workers_prefix(Tx, Role) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ erlfdb_tuple:pack({?COUCH_WORKERS, Role}, Prefix).
diff --git a/src/couch_workers/src/couch_workers_global.erl b/src/couch_workers/src/couch_workers_global.erl
new file mode 100644
index 0000000..efda3f0
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_global.erl
@@ -0,0 +1,228 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_global).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ subscribe/3,
+ unsubscribe/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(ROLE_MONITOR_POLL_INTERVAL_MSEC, 5000).
+
+-define(CACHE, couch_workers_global_cache).
+-define(SUBSCRIBERS, couch_workers_global_subscribers).
+-define(MONITOR_PIDS, couch_workers_global_monitor_pids).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+subscribe(Role, Module, Pid) ->
+ gen_server:call(?MODULE, {subscribe, Role, Module, Pid}, infinity).
+
+
+unsubscribe(Ref) ->
+ gen_server:call(?MODULE, {unsubscribe, Ref}, infinity).
+
+
+get_workers(Role) ->
+ case ets:lookup(?CACHE, Role) of
+ [{Role, VS, Workers}] ->
+ {ok, VS, Workers};
+ [] ->
+ {error, not_found}
+ end.
+
+
+%% gen_server callbacks
+
+init(_) ->
+ EtsOpts = [protected, named_table],
+ % {Role, VS, Workers}
+ ets:new(?CACHE, EtsOpts ++ [{read_concurrency, true}]),
+ % {{Role, Ref}, Module, VS}
+ ets:new(?SUBSCRIBERS, EtsOpts ++ [ordered_set]),
+ % {Role, Pid}
+ ets:new(?MONITOR_PIDS, EtsOpts),
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call({subscribe, Role, Mod, Pid}, From, St) ->
+ Ref = erlang:monitor(process, Pid),
+ subscribe_int(Role, Ref, Mod),
+ gen_server:reply(From, Ref),
+ case get_workers(Role) of
+ {ok, VS, Workers} ->
+ do_callback(Mod, Role, Workers, VS, Ref);
+ {error, not_found} ->
+ ok
+ end,
+ {noreply, St};
+
+handle_call({unsubscribe, Ref}, _From, St) ->
+ unsubscribe_int(Ref),
+ {reply, ok, St};
+
+handle_call({membership_update, Role, VS, Workers}, _From, St) ->
+ membership_update(Role, VS, Workers),
+ {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+ unsubscribe_int(Ref),
+ {reply, ok, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+%% Utility functions
+
+membership_update(Role, VS, Workers) ->
+ true = ets:insert(?CACHE, {Role, VS, Workers}),
+ lists:foreach(fun
+ ({_Ref, _Mod, VS}) -> ok;
+ ({Ref, Mod, _OldVs}) -> do_callback(Mod, Role, Workers, VS, Ref)
+ end, find_subscribers(Role)).
+
+
+do_callback(nil, _, _, _, _) ->
+ % User didn't want a callback, they'll be polling the cache
+ ok;
+
+do_callback(Mod, Role, Workers, VS, Ref) ->
+ try
+ Mod:couch_workers_membership_update(Role, Workers, VS, Ref)
+ catch
+ Tag:Err ->
+ ErrMsg = "~p : failed when calling callback Mod:~p Role:~p ~p:~p",
+ couch_log:error(ErrMsg, [?MODULE, Mod, Role, Tag, Err])
+ end.
+
+
+subscribe_int(Role, Ref, Module) ->
+ true = ets:insert(?SUBSCRIBERS, {{Role, Ref}, Module, nil}),
+ maybe_start_role_monitor(Role).
+
+
+unsubscribe_int(Ref) ->
+ case find_subscriber_role(Ref) of
+ {ok, Role} ->
+ true = ets:delete(?SUBSCRIBERS, {Role, Ref}),
+ case find_subscribers(Role) of
+ [] ->
+ true = ets:delete(?CACHE, Role),
+ stop_role_monitor(Role);
+ [_] ->
+ ok
+ end;
+ {error, not_found} ->
+ ok
+ end.
+
+
+find_subscriber_role(Ref) ->
+ case ets:match(?SUBSCRIBERS, {{'$1', Ref}, '_'}) of
+ [] -> {error, not_found};
+ [[Role]] -> {ok, Role}
+ end.
+
+
+find_subscribers(Role) ->
+ % Using ETS partial key match in ordered set
+ Subscribers = ets:match(?SUBSCRIBERS, {{Role, '$1'}, '$2', '$3'}),
+ lists:map(fun([Ref, Mod, VS]) -> {Ref, Mod, VS} end, Subscribers).
+
+
+maybe_start_role_monitor(Role) ->
+ case ets:lookup(?MONITOR_PIDS, Role) of
+ [{_Role, _Pid}] ->
+ false;
+ [] ->
+ PollMSec = ?ROLE_MONITOR_POLL_INTERVAL_MSEC,
+ Self = self(),
+ Pid = spawn_link(fun() ->
+ role_monitor_loop(Role, nil, Self, PollMSec)
+ end),
+ true = ets:insert(?MONITOR_PIDS, {Role, Pid})
+ end.
+
+
+stop_role_monitor(Role) ->
+ [{_, Pid}] = ets:lookup(?MONITOR_PIDS, Role),
+ true = ets:delete(?MONITOR_PIDS, Role),
+ Ref = monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, kill),
+ receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+
+% Replace with a watch eventually but with a polling fallback
+% in case we make too many watches
+role_monitor_loop(Role, VS, ReportPid, PollMSec) ->
+ NewVS = case get_workers_vs(Role) of
+ VS ->
+ VS;
+ OtherVS when is_binary(OtherVS) ->
+ {VS1, Workers} = get_workers_and_vs(Role),
+ CallMsg = {membership_updated, Role, VS1, Workers},
+ ok = gen_server:call(ReportPid, CallMsg, infinity),
+ VS1
+ end,
+ timer:sleep(PollMSec),
+ role_monitor_loop(Role, NewVS, ReportPid, PollMSec).
+
+
+get_workers_vs(Role) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:get_workers_vs(Tx, Role)
+ end).
+
+
+get_workers_and_vs(Role) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ VS = couch_workers_fdb:get_workers(Tx, Role),
+ Workers = couch_workers_fdb:get_workers(Tx, Role),
+ {VS, Workers}
+ end).
diff --git a/src/couch_workers/src/couch_workers_local.erl b/src/couch_workers/src/couch_workers_local.erl
new file mode 100644
index 0000000..24cd2fd
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_local.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_local).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ worker_register/4,
+ worker_unregister/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(DEFAULT_HEALTH_TIMEOUT_SEC, 15).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+worker_register(Role, Id, Opts, Pid) ->
+ gen_server:call(?MODULE, {worker_register, Role, Id, Opts, Pid}, infinity).
+
+
+worker_unregister(Ref) ->
+ get_server:call(?MODULE, {worker_unregsiter, Ref}, infinity).
+
+
+init(_) ->
+ % {Ref, Role, Id, HealthPid}
+ ets:new(?MODULE, [protected, named_table]),
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call({worker_register, Role, Id, Opts, Pid}, _From, St) ->
+ worker_register_int(Role, Id, Opts, Pid),
+ {noreply, St};
+
+handle_call({worker_unregister, Ref}, _From, St) ->
+ worker_unregister_int(Ref),
+ {reply, ok, St};
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+ worker_unregister(Ref),
+ {reply, ok, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+%% Utility functions
+
+worker_register_int(Role, Id, Pid, Opts) ->
+ case ets:match(?MODULE, {'$1', Role, Id, '_'}) of
+ [] ->
+ Ref = erlang:monitor(process, Pid),
+ ok = set_worker(Role, Id, Opts),
+ Timeout = maps:get(timeout, Opts, ?DEFAULT_HEALTH_TIMEOUT_SEC),
+ HPid = spawn_link(fun() ->
+ health_pinger_loop(Role, Id, Timeout)
+ end),
+ true = ets:insert(?MODULE, {Ref, Role, Id, HPid}),
+ Ref;
+ [[Ref]] ->
+ Ref
+ end.
+
+
+worker_unregister_int(Ref) ->
+ case ets:lookup(?MODULE, Ref) of
+ [{_, Role, Id, HealthPid}] ->
+ ok = clear_worker(Role, Id),
+ kill_health_pinger(HealthPid),
+ true = ets:delete(?MODULE, Ref),
+ ok;
+ [] ->
+ couch_log:error("~p : unknown worker reference ~p", [?MODULE, Ref]),
+ ok
+ end.
+
+
+now_sec() ->
+ {Mega, Sec, _Micro} = os:timestamp(),
+ Mega * 1000000 + Sec.
+
+
+kill_health_pinger(Pid) when is_pid(Pid) ->
+ Ref = monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, kill),
+ receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+health_pinger_loop(Role, Id, Timeout) ->
+ set_worker_health(Role, Id, now_sec(), Timeout),
+ % todo: dd jitter here
+ timer:sleep(max(10, Timeout * 1000 / 3)),
+ health_pinger_loop(Role, Id, Timeout).
+
+
+set_worker_health(Role, Worker, TStamp, Timeout) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:set_worker_health(Tx, Role, Worker, TStamp, Timeout)
+ end).
+
+
+set_worker(Role, Worker, Opts) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:set_worker(Tx, Role, Worker, Opts)
+ end),
+ ok.
+
+
+clear_worker(Role, Worker) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:clear_worker(Tx, Role, Worker)
+ end),
+ ok.
diff --git a/src/couch_workers/src/couch_workers_sup.erl b/src/couch_workers/src/couch_workers_sup.erl
new file mode 100644
index 0000000..15a0a3b
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_sup.erl
@@ -0,0 +1,49 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ Flags = #{
+ strategy => one_for_one,
+ intensity => 1,
+ period => 5
+ },
+ Children = [
+ #{
+ id => couch_workers_server,
+ start => {couch_workers_global, start_link, []}
+ },
+ #{
+ id => couch_workers_server,
+ start => {couch_workers_local, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.