You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ii...@apache.org on 2019/02/19 19:37:27 UTC
[couchdb-ioq] branch master updated: Reconfigure IOQ on config
update
This is an automated email from the ASF dual-hosted git repository.
iilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
The following commit(s) were added to refs/heads/master by this push:
new b5b801a Reconfigure IOQ on config update
new 89fe01f Merge pull request #8 from cloudant/update_handle_config_terminate
b5b801a is described below
commit b5b801abc02f820ad4a6ada202270015eafc1ba9
Author: ILYA Khlopotov <ii...@apache.org>
AuthorDate: Fri Feb 15 13:01:22 2019 +0000
Reconfigure IOQ on config update
---
src/ioq_sup.erl | 45 ++++++++++++++++++++
test/ioq_config_tests.erl | 105 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 150 insertions(+)
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 7ea6284..3e9a494 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -12,11 +12,16 @@
-module(ioq_sup).
-behaviour(supervisor).
+-vsn(1).
+-behaviour(config_listener).
-export([start_link/0, init/1]).
-export([get_ioq2_servers/0]).
+-export([handle_config_change/5, handle_config_terminate/3]).
+-export([processes/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+-define(CHILD_WITH_ARGS(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@@ -27,6 +32,7 @@ init([]) ->
{ok, {
{one_for_one, 5, 10},
[
+ ?CHILD_WITH_ARGS(config_listener_mon, worker, [?MODULE, nil]),
?CHILD(ioq_server, worker),
?CHILD(ioq_osq, worker)
| IOQ2Children
@@ -47,3 +53,42 @@ get_ioq2_servers() ->
lists:map(fun(I) ->
list_to_atom("ioq_server_" ++ integer_to_list(I))
end, lists:seq(1, erlang:system_info(schedulers))).
+
+handle_config_change("ioq", _Key, _Val, _Persist, St) ->
+ gen_server:cast(ioq_server, update_config),
+ {ok, St};
+handle_config_change("ioq2" ++ _, _Key, _Val, _Persist, St) ->
+ lists:foreach(fun({_Id, Pid}) ->
+ gen_server:call(Pid, update_config)
+ end, processes(ioq2)),
+ {ok, St};
+handle_config_change(_Sec, _Key, _Val, _Persist, St) ->
+ {ok, St}.
+
+handle_config_terminate(_Server, _Reason, _State) ->
+ gen_server:cast(ioq_server, update_config),
+ spawn(fun() ->
+ lists:foreach(fun({_Id, Pid}) ->
+ gen_server:call(Pid, update_config)
+ end, processes(ioq2))
+ end),
+ ok.
+
+processes(ioq2) ->
+ filter_children("^ioq_server_.*$");
+processes(ioq) ->
+ filter_children("^ioq_server$");
+processes(config_listener_mon) ->
+ filter_children("^config_listener_mon$");
+processes(Arg) ->
+ {error, [
+ {expected_one_of, [ioq, ioq2, config_listener_mon]},
+ {got, Arg}]}.
+
+filter_children(RegExp) ->
+ lists:filtermap(fun({Id, P, _, _}) ->
+ case re:run(atom_to_list(Id), RegExp) of
+ {match, _} -> {true, {Id, P}};
+ _ -> false
+ end
+ end, supervisor:which_children(?MODULE)).
diff --git a/test/ioq_config_tests.erl b/test/ioq_config_tests.erl
index d3fee81..2a5ccc3 100644
--- a/test/ioq_config_tests.erl
+++ b/test/ioq_config_tests.erl
@@ -30,6 +30,111 @@
}
]).
+config_update_test_() ->
+ {
+ "Test config updates",
+ {
+ foreach,
+ fun() -> test_util:start_applications([config, ioq]) end,
+ fun test_util:stop_applications/1,
+ [
+ fun t_restart_config_listener/1,
+ fun t_update_ioq_config/1,
+ fun t_update_ioq2_config/1,
+ fun t_update_ioq_config_on_listener_restart/1,
+ fun t_update_ioq2_config_on_listener_restart/1
+ ]
+ }
+}.
+
+t_restart_config_listener(_) ->
+ ?_test(begin
+ [{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
+ ?assert(is_process_alive(ConfigMonitor)),
+ test_util:stop_sync(ConfigMonitor),
+ ?assertNot(is_process_alive(ConfigMonitor)),
+ NewConfigMonitor = test_util:wait(fun() ->
+ case ioq_sup:processes(config_listener_mon) of
+ [] -> wait;
+ [{_, Pid}] -> Pid
+ end
+ end),
+ ?assert(is_process_alive(NewConfigMonitor))
+ end).
+
+t_update_ioq_config(_) ->
+ ?_test(begin
+ [{_, IoqServer}] = ioq_sup:processes(ioq),
+ gen_server:call(IoqServer, {set_concurrency, 10}),
+ ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+ ?assert(is_process_alive(IoqServer)),
+ config:set("ioq", "concurrency", "200", false),
+ ?assertNotEqual(timeout, test_util:wait(fun() ->
+ case gen_server:call(IoqServer, get_concurrency) of
+ 200 -> 200;
+ _ -> wait
+ end
+ end)),
+ ?assert(is_process_alive(IoqServer))
+ end).
+
+t_update_ioq_config_on_listener_restart(_) ->
+ ?_test(begin
+ [{_, IoqServer}] = ioq_sup:processes(ioq),
+ DefaultConcurrency = gen_server:call(IoqServer, get_concurrency),
+ gen_server:call(IoqServer, {set_concurrency, 10}),
+ ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+ ?assert(is_process_alive(IoqServer)),
+
+ [{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
+ ?assert(is_process_alive(ConfigMonitor)),
+ test_util:stop_sync(ConfigMonitor),
+
+ ?assertNotEqual(timeout, test_util:wait(fun() ->
+ case gen_server:call(IoqServer, get_concurrency) of
+ DefaultConcurrency -> ok;
+ _ -> wait
+ end
+ end)),
+ ?assert(is_process_alive(IoqServer))
+ end).
+
+t_update_ioq2_config(_) ->
+ ?_test(begin
+ [{_, IoqServer} | _] = ioq_sup:processes(ioq2),
+ gen_server:call(IoqServer, {set_concurrency, 10}),
+ ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+ ?assert(is_process_alive(IoqServer)),
+ config:set("ioq2", "concurrency", "200", false),
+ ?assertNotEqual(timeout, test_util:wait(fun() ->
+ case gen_server:call(IoqServer, get_concurrency) of
+ 200 -> 200;
+ _ -> wait
+ end
+ end)),
+ ?assert(is_process_alive(IoqServer))
+ end).
+
+t_update_ioq2_config_on_listener_restart(_) ->
+ ?_test(begin
+ [{_, IoqServer} | _] = ioq_sup:processes(ioq2),
+ DefaultConcurrency = gen_server:call(IoqServer, get_concurrency),
+ gen_server:call(IoqServer, {set_concurrency, 10}),
+ ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+ ?assert(is_process_alive(IoqServer)),
+
+ [{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
+ ?assert(is_process_alive(ConfigMonitor)),
+ test_util:stop_sync(ConfigMonitor),
+
+ ?assertNotEqual(timeout, test_util:wait(fun() ->
+ case gen_server:call(IoqServer, get_concurrency) of
+ DefaultConcurrency -> ok;
+ _ -> wait
+ end
+ end)),
+ ?assert(is_process_alive(IoqServer))
+ end).
priorities_test_() ->
{ok, ShardP} = ioq_config:build_shard_priorities(?SHARDS_CONFIG),