You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 21:54:56 UTC
[couchdb-ioq] 03/07: Support finding ioq pids by #ioq_request{}
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit 034db83e7a56a5a5c14ec834c5a3160a960f940a
Author: Russell Branca <ch...@apache.org>
AuthorDate: Wed Mar 6 23:03:55 2019 +0000
Support finding ioq pids by #ioq_request{}
---
src/ioq_opener.erl | 53 +++++++++++++++++++++++++++++++++++++++++++++--------
src/ioq_server2.erl | 28 ++++++++++++----------------
test/ioq_tests.erl | 6 ++++--
3 files changed, 61 insertions(+), 26 deletions(-)
diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
index d85991e..5bcf065 100644
--- a/src/ioq_opener.erl
+++ b/src/ioq_opener.erl
@@ -12,7 +12,7 @@
]).
-export([
start_link/0,
- %%fetch_pid_for/1,
+ fetch_pid_for/1,
fetch_pid_for/2,
fetch_pid_for/3,
get_pid_for/1,
@@ -24,10 +24,14 @@
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(BY_USER, by_user).
-define(BY_SHARD, by_shard).
+-define(BY_CLASS, by_class).
+-define(BY_FD, by_fd).
+-define(BY_DB, by_db).
-define(DEFAULT_DISPATCH, ?BY_SHARD).
-define(PDICT_MARKER, ioq_pid_for).
@@ -36,7 +40,7 @@
idle = [] :: [{erlang:timestamp(), pid()}],
pid_idx :: khash:khash(),
monitors :: khash:khash(),
- dispatch :: by_shard | by_user | undefined
+ dispatch :: ?BY_SHARD | ?BY_DB | ?BY_USER | ?BY_CLASS | ?BY_FD | undefined
}).
@@ -46,6 +50,10 @@
%% fetch_pid_for(DbName, self()).
+fetch_pid_for(#ioq_request{}=Req) ->
+ gen_server:call(?MODULE, {fetch, Req}, infinity).
+
+
%% TODO: cleanup the overloaded arity once experiments concluded
%%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
%% fetch_pid_for(DbName, undefined, self());
@@ -111,8 +119,11 @@ init([]) ->
{ok, Monitors} = khash:new(),
Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
"by_shard" -> ?BY_SHARD;
- "by_user" -> ?BY_USER;
- _ -> ?DEFAULT_DISPATCH
+ "by_db" -> ?BY_DB;
+ "by_user" -> ?BY_USER;
+ "by_class" -> ?BY_CLASS;
+ "by_fd" -> ?BY_FD;
+ _ -> ?DEFAULT_DISPATCH
end,
St = #st{
pid_idx = PidIdx,
@@ -122,6 +133,30 @@ init([]) ->
{ok, St}.
+handle_call({fetch, #ioq_request{}=Req}, _From, #st{dispatch=Dispatch}=St) ->
+ Key = case Dispatch of
+ ?BY_SHARD ->
+ Req#ioq_request.shard;
+ ?BY_DB ->
+ Req#ioq_request.db;
+ ?BY_USER ->
+ Req#ioq_request.user;
+ ?BY_CLASS ->
+ Req#ioq_request.class;
+ ?BY_FD ->
+ {fd, Req#ioq_request.fd}
+ end,
+ IOQPid = case khash:get(St#st.pid_idx, Key, not_found) of
+ not_found ->
+ {ok, Pid} = ioq_server2:start_link({Dispatch, Key}),
+ khash:put(St#st.pid_idx, Key, Pid),
+ khash:put(St#st.pid_idx, Pid, Key),
+ Pid;
+ Pid ->
+ Pid
+ end,
+ ok = add_monitor(St#st.monitors, Req#ioq_request.fd, IOQPid),
+ {reply, IOQPid, St};
handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
Caller = case FdPid of
undefined -> From;
@@ -134,7 +169,7 @@ handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) -
end,
IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
not_found ->
- {ok, Pid} = ioq_server2:start_link({user, Name}),
+ {ok, Pid} = ioq_server2:start_link({?BY_USER, Name}),
khash:put(St#st.pid_idx, Name, Pid),
khash:put(St#st.pid_idx, Pid, Name),
Pid;
@@ -151,7 +186,7 @@ handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St)
%% TODO: DbName = drop_compact_ext(DbName0),
IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
not_found ->
- {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+ {ok, Pid} = ioq_server2:start_link({?BY_SHARD, DbName}),
khash:put(St#st.pid_idx, DbName, Pid),
khash:put(St#st.pid_idx, Pid, DbName),
Pid;
@@ -210,7 +245,8 @@ add_monitor(Mons, FdPid, IOQPid) ->
not_found ->
Ref0 = erlang:monitor(process, FdPid),
khash:put(Mons, Ref0, PidKey),
- khash:put(Mons, PidKey, Ref0);
+ khash:put(Mons, PidKey, Ref0),
+ khash:put(Mons, FdPid, Ref0);
Ref0 ->
Ref0
end,
@@ -233,12 +269,13 @@ drop_monitor(Mons, Ref) when is_reference(Ref) ->
not_found ->
%% TODO: shouldn't happen
throw(unexpected);
- {_FdPid, IOQPid}=PidKey ->
+ {FdPid, IOQPid}=PidKey ->
case khash:get(Mons, IOQPid, not_found) of
not_found ->
%% TODO: shouldn't happen
throw(unexpected);
Refs ->
+ khash:del(Mons, FdPid),
khash:del(Mons, Ref),
khash:del(Mons, PidKey),
case lists:delete(Ref, Refs) of
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 595babe..85e3644 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -112,13 +112,14 @@ call(Fd, Msg, Dimensions) when Dimensions =/= undefined ->
_ ->
Server = case ioq_opener:get_pid_for(Fd) of
undefined ->
- %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
- %% undefined ->
- %% ioq_server2;
- %% IOQPid ->
- %% IOQPid
- %%end;
- ioq_server2;
+ IOQPid = case ioq_opener:fetch_pid_for(Req) of
+ undefined ->
+ ioq_server2;
+ IOQPid0 ->
+ IOQPid0
+ end,
+ ioq_opener:set_pid_for(Fd, IOQPid),
+ IOQPid;
IOQPid ->
IOQPid
end,
@@ -270,9 +271,9 @@ start_link() ->
start_link(?MODULE) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
-start_link({user, _Name}=User) ->
+start_link({by_user, _Name}=User) ->
gen_server:start_link(?MODULE, [User], []);
-start_link({shard, _Name}=Shard) ->
+start_link({by_shard, _Name}=Shard) ->
gen_server:start_link(?MODULE, [Shard], []).
@@ -803,11 +804,6 @@ test_simple_dedupe(St0) ->
from = FromA,
key = {Fd, Pos}
},
- _Request1B = Request0#ioq_request{
- init_priority = Priority,
- from = FromA,
- key = {Fd, Pos}
- },
{noreply, St2, 0} = handle_call(Request0, FromA, St1),
{noreply, St3, 0} = handle_call(Request0, FromB, St2),
{reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3),
@@ -877,7 +873,7 @@ test_auto_scale(#state{queue=HQ}=St0) ->
{_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ),
Tests0 = [?_assertEqual(PriorityA, PriorityA2)],
{_St, Tests} = lists:foldl(
- fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) ->
+ fun(_N, {#state{iterations=I, resize_limit=_RL}=StN0, TestsN}) ->
ReqN = BaseReq#ioq_request{ref=make_ref()},
ExpectedPriority = case I == 1 of
false -> PriorityA;
@@ -968,7 +964,7 @@ cleanup(Servers) ->
instantiate(S) ->
- Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
+ Old = ?DEFAULT_CONCURRENCY * (1 + length(shards())),
[{inparallel, lists:map(fun(IOClass) ->
lists:map(fun(Shard) ->
check_call(S, make_ref(), priority(IOClass, Shard))
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index c2502c5..c228241 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -35,14 +35,16 @@ cleanup({Apps, Server}) ->
exit(Server, kill).
instantiate({_, S}) ->
+ Shards = shards(),
[{inparallel, lists:map(fun(IOClass) ->
lists:map(fun(Shard) ->
check_call(S, make_ref(), priority(IOClass, Shard))
- end, shards())
+ end, Shards)
end, io_classes())},
case ioq:ioq2_enabled() of
true ->
- ?_assertEqual(1, ioq:set_disk_concurrency(10));
+ %% TODO: don't assume IOQ2 concurrency is 1
+ ?_assertEqual(1 + length(Shards), ioq:set_disk_concurrency(10));
false ->
?_assertEqual(20, ioq:set_disk_concurrency(10))
end,