You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 21:54:54 UTC
[couchdb-ioq] 01/07: WIP: IOQ2 per shard/user
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit 54cb8abf851ef905d1620a198807aafe243f6c42
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Feb 28 18:48:21 2019 +0000
WIP: IOQ2 per shard/user
---
src/ioq.erl | 24 ++++-
src/ioq_opener.erl | 256 ++++++++++++++++++++++++++++++++++++++++++++++++++++
src/ioq_server2.erl | 46 ++++++----
src/ioq_sup.erl | 17 ++--
test/ioq_tests.erl | 7 +-
5 files changed, 318 insertions(+), 32 deletions(-)
diff --git a/src/ioq.erl b/src/ioq.erl
index 160a448..2530dba 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -15,7 +15,12 @@
get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
get_disk_counters/0, get_disk_concurrency/0]).
-export([
- ioq2_enabled/0
+ ioq2_enabled/0,
+ fetch_pid_for/1,
+ fetch_pid_for/2,
+ fetch_pid_for/3,
+ get_pid_for/1,
+ set_pid_for/2
]).
-define(APPS, [config, folsom, couch_stats, ioq]).
@@ -72,5 +77,20 @@ get_osproc_requests() ->
gen_server:call(ioq_osq, get_requests).
ioq2_enabled() ->
- config:get_boolean("ioq2", "enabled", false).
+ config:get_boolean("ioq2", "enabled", true).
+
+fetch_pid_for(DbName) ->
+ ioq_opener:fetch_pid_for(DbName).
+
+fetch_pid_for(DbName, FdPid) ->
+ ioq_opener:fetch_pid_for(DbName, FdPid).
+
+fetch_pid_for(DbName, UserCtx, FdPid) ->
+ ioq_opener:fetch_pid_for(DbName, UserCtx, FdPid).
+
+get_pid_for(FdPid) ->
+ ioq_opener:get_pid_for(FdPid).
+
+set_pid_for(FdPid, IOQPid) ->
+ ioq_opener:set_pid_for(FdPid, IOQPid).
diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
new file mode 100644
index 0000000..d85991e
--- /dev/null
+++ b/src/ioq_opener.erl
@@ -0,0 +1,256 @@
+-module(ioq_opener).
+-behavior(gen_server).
+
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
+-export([
+ start_link/0,
+ %%fetch_pid_for/1,
+ fetch_pid_for/2,
+ fetch_pid_for/3,
+ get_pid_for/1,
+ set_pid_for/2,
+ get_ioq_pids/0,
+ get_pid_idx/0,
+ get_monitor_idx/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(BY_USER, by_user).
+-define(BY_SHARD, by_shard).
+-define(DEFAULT_DISPATCH, ?BY_SHARD).
+-define(PDICT_MARKER, ioq_pid_for).
+
+
+-record(st, {
+ idle = [] :: [{erlang:timestamp(), pid()}],
+ pid_idx :: khash:khash(),
+ monitors :: khash:khash(),
+ dispatch :: by_shard | by_user | undefined
+ }).
+
+
+%% HACK: experiment to allow for spawning IOQ2 pids prior to the spawning
+%% the associated couch_file pids
+%%fetch_pid_for(DbName) when is_binary(DbName) ->
+%% fetch_pid_for(DbName, self()).
+
+
+%% TODO: cleanup the overloaded arity once experiments concluded
+%%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
+%% fetch_pid_for(DbName, undefined, self());
+%%fetch_pid_for(DbName, #user_ctx{}=Ctx) when is_binary(DbName) ->
+%% fetch_pid_for(DbName, Ctx, self());
+fetch_pid_for(DbName, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+ fetch_pid_for(DbName, undefined, FdPid).
+
+
+fetch_pid_for(DbName, UserCtx, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+ gen_server:call(?MODULE, {fetch, DbName, UserCtx, FdPid}, infinity).
+
+
+get_pid_for(undefined) ->
+ undefined;
+get_pid_for(DbName) when is_binary(DbName) ->
+ %% HACK: use the same shard format as per #ioq_request{} to post facto
+ %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+ %% to having a db handle
+ erlang:get({?PDICT_MARKER, filename:rootname(DbName)});
+get_pid_for(FdPid) when is_pid(FdPid) ->
+ erlang:get({?PDICT_MARKER, FdPid}).
+
+
+set_pid_for(_, undefined) ->
+ ok;
+set_pid_for(DbName, IOQPid) when is_binary(DbName), is_pid(IOQPid) ->
+ %% HACK: use the same shard format as per #ioq_request{} to post facto
+ %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+ %% to having a db handle
+ erlang:put({?PDICT_MARKER, filename:rootname(DbName)}, IOQPid),
+ ok;
+set_pid_for(FdPid, IOQPid) when is_pid(FdPid), is_pid(IOQPid) ->
+ erlang:put({?PDICT_MARKER, FdPid}, IOQPid),
+ ok.
+
+
+get_pid_idx() ->
+ gen_server:call(?MODULE, get_pid_idx, infinity).
+
+
+get_monitor_idx() ->
+ gen_server:call(?MODULE, get_monitor_idx, infinity).
+
+
+get_ioq_pids() ->
+ lists:foldl(
+ fun
+ ({K, _V}, Acc) when is_pid(K) ->
+ [K | Acc];
+ (_, Acc) ->
+ Acc
+ end, [], get_pid_idx()).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+ process_flag(trap_exit, true),
+ {ok, PidIdx} = khash:new(),
+ {ok, Monitors} = khash:new(),
+ Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
+ "by_shard" -> ?BY_SHARD;
+ "by_user" -> ?BY_USER;
+ _ -> ?DEFAULT_DISPATCH
+ end,
+ St = #st{
+ pid_idx = PidIdx,
+ monitors = Monitors,
+ dispatch = Dispatch
+ },
+ {ok, St}.
+
+
+handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
+ Caller = case FdPid of
+ undefined -> From;
+ _ when is_pid(FdPid) -> FdPid
+ end,
+ Name = case UserCtx of
+ #user_ctx{name=Name0} -> Name0;
+ %% TODO: support unknown user
+ undefined -> throw(unknown_user)
+ end,
+ IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
+ not_found ->
+ {ok, Pid} = ioq_server2:start_link({user, Name}),
+ khash:put(St#st.pid_idx, Name, Pid),
+ khash:put(St#st.pid_idx, Pid, Name),
+ Pid;
+ Pid ->
+ Pid
+ end,
+ ok = add_monitor(St#st.monitors, Caller, IOQPid),
+ {reply, IOQPid, St};
+handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) ->
+ Caller = case FdPid of
+ undefined -> From;
+ _ when is_pid(FdPid) -> FdPid
+ end,
+ %% TODO: DbName = drop_compact_ext(DbName0),
+ IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
+ not_found ->
+ {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+ khash:put(St#st.pid_idx, DbName, Pid),
+ khash:put(St#st.pid_idx, Pid, DbName),
+ Pid;
+ Pid ->
+ Pid
+ end,
+ ok = add_monitor(St#st.monitors, Caller, IOQPid),
+ {reply, IOQPid, St};
+handle_call(get_pid_idx, _From, #st{}=St) ->
+ {reply, khash:to_list(St#st.pid_idx), St};
+handle_call(get_monitor_idx, _From, #st{}=St) ->
+ {reply, khash:to_list(St#st.monitors), St};
+handle_call(_, _From, St) ->
+ {reply, ok, St}.
+
+
+handle_cast(_Msg, St) ->
+ {noreply, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+ case drop_monitor(St#st.monitors, Ref) of
+ {IOQPid, []} ->
+ Name = khash:get(St#st.pid_idx, IOQPid), %% TODO: assert found?
+ khash:del(St#st.pid_idx, IOQPid),
+ khash:del(St#st.pid_idx, Name);
+ {_IOQPid, _Refs} ->
+ ok
+ end,
+ {noreply, St};
+handle_info({'EXIT', Pid, _}, St) ->
+ case khash:get(St#st.pid_idx, Pid, not_found) of
+ not_found ->
+ %% TODO: shouldn't happen, throw error?
+ ok;
+ Name ->
+ khash:del(St#st.pid_idx, Pid),
+ khash:del(St#st.pid_idx, Name)
+ end,
+ {noreply, St};
+handle_info(_Info, St) ->
+ {noreply, St}.
+
+
+terminate(_Reason, _St) ->
+ ok.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+add_monitor(Mons, FdPid, IOQPid) ->
+ PidKey = {FdPid, IOQPid},
+ Ref = case khash:get(Mons, PidKey, not_found) of
+ not_found ->
+ Ref0 = erlang:monitor(process, FdPid),
+ khash:put(Mons, Ref0, PidKey),
+ khash:put(Mons, PidKey, Ref0);
+ Ref0 ->
+ Ref0
+ end,
+ case khash:get(Mons, IOQPid, not_found) of
+ not_found ->
+ khash:put(Mons, IOQPid, [Ref]);
+ Refs ->
+ case lists:member(Ref, Refs) of
+ true ->
+ ok;
+ false ->
+ khash:put(Mons, IOQPid, [Ref | Refs])
+ end
+ end,
+ ok.
+
+
+drop_monitor(Mons, Ref) when is_reference(Ref) ->
+ case khash:get(Mons, Ref, not_found) of
+ not_found ->
+ %% TODO: shouldn't happen
+ throw(unexpected);
+ {_FdPid, IOQPid}=PidKey ->
+ case khash:get(Mons, IOQPid, not_found) of
+ not_found ->
+ %% TODO: shouldn't happen
+ throw(unexpected);
+ Refs ->
+ khash:del(Mons, Ref),
+ khash:del(Mons, PidKey),
+ case lists:delete(Ref, Refs) of
+ [] ->
+ unlink(IOQPid),
+ khash:del(Mons, IOQPid),
+ exit(IOQPid, idle),
+ {IOQPid, []};
+ Refs1 ->
+ khash:put(Mons, IOQPid, Refs1),
+ {IOQPid, Refs1}
+ end
+ end
+ end.
+
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index dadd44d..b3dc24f 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -23,6 +23,8 @@
code_change/3
]).
-export([
+ start_link/0,
+ start_link/1,
start_link/3,
call/3,
pcall/1,
@@ -108,21 +110,17 @@ call(Fd, Msg, Dimensions) ->
[couchdb, io_queue2, RW, bypassed_count]),
gen_server:call(Fd, Msg, infinity);
_ ->
- DispatchStrategy = config:get(
- "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
- Server = case DispatchStrategy of
- ?DISPATCH_RANDOM ->
- SID = rand:uniform(erlang:system_info(schedulers)),
- ?SERVER_ID(SID);
- ?DISPATCH_FD_HASH ->
- NumSchedulers = erlang:system_info(schedulers),
- SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
- ?SERVER_ID(SID);
- ?DISPATCH_SINGLE_SERVER ->
- ?SERVER_ID(1);
- _ ->
- SID = erlang:system_info(scheduler_id),
- ?SERVER_ID(SID)
+ Server = case ioq_opener:get_pid_for(Fd) of
+ undefined ->
+ %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
+ %% undefined ->
+ %% ioq_server2;
+ %% IOQPid ->
+ %% IOQPid
+ %%end;
+ ioq_server2;
+ IOQPid ->
+ IOQPid
end,
gen_server:call(Server, Req, infinity)
end.
@@ -266,6 +264,18 @@ update_config() ->
gen_server:call(?SERVER_ID(1), update_config, infinity).
+start_link() ->
+ start_link(?MODULE).
+
+
+start_link(?MODULE) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
+start_link({user, _Name}=User) ->
+ gen_server:start_link(?MODULE, [User], []);
+start_link({shard, _Name}=Shard) ->
+ gen_server:start_link(?MODULE, [Shard], []).
+
+
start_link(Name, SID, Bind) ->
Options = case Bind of
true -> [{scheduler, SID}];
@@ -274,7 +284,7 @@ start_link(Name, SID, Bind) ->
gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
-init([Name, SID]) ->
+init([{Type, Name}]) ->
{ok, HQ} = hqueue:new(),
{ok, Reqs} = khash:new(),
{ok, Waiters} = khash:new(),
@@ -283,7 +293,7 @@ init([Name, SID]) ->
reqs = Reqs,
waiters = Waiters,
server_name = Name,
- scheduler_id = SID
+ scheduler_id = Type
},
{ok, update_config_int(State)}.
@@ -630,7 +640,7 @@ mock_server(Config) ->
meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
Default
end),
- {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
+ {ok, State} = ioq_server2:init([{global, ?SERVER_ID(1)}]),
State.
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 8bfda77..973886a 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -15,7 +15,7 @@
-vsn(1).
-behaviour(config_listener).
-export([start_link/0, init/1]).
--export([get_ioq2_servers/0]).
+-export([get_ioq2_servers/0, get_ioq2_servers_new/0]).
-export([handle_config_change/5, handle_config_terminate/3]).
-export([processes/1]).
@@ -28,26 +28,20 @@ start_link() ->
init([]) ->
ok = ioq_config_listener:subscribe(),
- IOQ2Children = ioq_server2_children(),
{ok, {
{one_for_one, 5, 10},
[
?CHILD_WITH_ARGS(config_listener_mon, worker, [?MODULE, nil]),
+ ?CHILD(ioq_opener, worker),
?CHILD(ioq_server, worker),
+ ?CHILD(ioq_server2, worker),
?CHILD(ioq_osq, worker)
- | IOQ2Children
]
}}.
-ioq_server2_children() ->
- Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
- ioq_server2_children(erlang:system_info(schedulers), Bind).
-ioq_server2_children(Count, Bind) ->
- lists:map(fun(I) ->
- Name = list_to_atom("ioq_server_" ++ integer_to_list(I)),
- {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]}
- end, lists:seq(1, Count)).
+get_ioq2_servers_new() ->
+ [ioq_server2 | ioq_opener:get_ioq_pids()].
get_ioq2_servers() ->
lists:map(fun(I) ->
@@ -92,3 +86,4 @@ filter_children(RegExp) ->
_ -> false
end
end, supervisor:which_children(?MODULE)).
+
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index b6b7bad..c2502c5 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -40,7 +40,12 @@ instantiate({_, S}) ->
check_call(S, make_ref(), priority(IOClass, Shard))
end, shards())
end, io_classes())},
- ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+ case ioq:ioq2_enabled() of
+ true ->
+ ?_assertEqual(1, ioq:set_disk_concurrency(10));
+ false ->
+ ?_assertEqual(20, ioq:set_disk_concurrency(10))
+ end,
?_assertError(badarg, ioq:set_disk_concurrency(0)),
?_assertError(badarg, ioq:set_disk_concurrency(-1)),
?_assertError(badarg, ioq:set_disk_concurrency(foo))].