You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 21:54:56 UTC

[couchdb-ioq] 03/07: Support finding ioq pids by #ioq_request{}

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 034db83e7a56a5a5c14ec834c5a3160a960f940a
Author: Russell Branca <ch...@apache.org>
AuthorDate: Wed Mar 6 23:03:55 2019 +0000

    Support finding ioq pids by #ioq_request{}
---
 src/ioq_opener.erl  | 53 +++++++++++++++++++++++++++++++++++++++++++++--------
 src/ioq_server2.erl | 28 ++++++++++++----------------
 test/ioq_tests.erl  |  6 ++++--
 3 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
index d85991e..5bcf065 100644
--- a/src/ioq_opener.erl
+++ b/src/ioq_opener.erl
@@ -12,7 +12,7 @@
 ]).
 -export([
     start_link/0,
-    %%fetch_pid_for/1,
+    fetch_pid_for/1,
     fetch_pid_for/2,
     fetch_pid_for/3,
     get_pid_for/1,
@@ -24,10 +24,14 @@
 
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 -define(BY_USER, by_user).
 -define(BY_SHARD, by_shard).
+-define(BY_CLASS, by_class).
+-define(BY_FD, by_fd).
+-define(BY_DB, by_db).
 -define(DEFAULT_DISPATCH, ?BY_SHARD).
 -define(PDICT_MARKER, ioq_pid_for).
 
@@ -36,7 +40,7 @@
     idle = [] :: [{erlang:timestamp(), pid()}],
     pid_idx :: khash:khash(),
     monitors :: khash:khash(),
-    dispatch :: by_shard | by_user | undefined
+    dispatch :: ?BY_SHARD | ?BY_DB | ?BY_USER | ?BY_CLASS | ?BY_FD | undefined
  }).
 
 
@@ -46,6 +50,10 @@
 %%    fetch_pid_for(DbName, self()).
 
 
+fetch_pid_for(#ioq_request{}=Req) ->
+    gen_server:call(?MODULE, {fetch, Req}, infinity).
+
+
 %% TODO: cleanup the overloaded arity once experiments concluded
 %%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
 %%    fetch_pid_for(DbName, undefined, self());
@@ -111,8 +119,11 @@ init([]) ->
     {ok, Monitors} = khash:new(),
     Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
         "by_shard" -> ?BY_SHARD;
-        "by_user" -> ?BY_USER;
-        _ -> ?DEFAULT_DISPATCH
+        "by_db"    -> ?BY_DB;
+        "by_user"  -> ?BY_USER;
+        "by_class" -> ?BY_CLASS;
+        "by_fd"    -> ?BY_FD;
+        _          -> ?DEFAULT_DISPATCH
     end,
     St = #st{
         pid_idx = PidIdx,
@@ -122,6 +133,30 @@ init([]) ->
     {ok, St}.
 
 
+handle_call({fetch, #ioq_request{}=Req}, _From, #st{dispatch=Dispatch}=St) ->
+    Key = case Dispatch of
+        ?BY_SHARD ->
+            Req#ioq_request.shard;
+        ?BY_DB ->
+            Req#ioq_request.db;
+        ?BY_USER ->
+            Req#ioq_request.user;
+        ?BY_CLASS ->
+            Req#ioq_request.class;
+        ?BY_FD ->
+            {fd, Req#ioq_request.fd}
+    end,
+    IOQPid = case khash:get(St#st.pid_idx, Key, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({Dispatch, Key}),
+            khash:put(St#st.pid_idx, Key, Pid),
+            khash:put(St#st.pid_idx, Pid, Key),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Req#ioq_request.fd, IOQPid),
+    {reply, IOQPid, St};
 handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
     Caller = case FdPid of
         undefined -> From;
@@ -134,7 +169,7 @@ handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) -
     end,
     IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
         not_found ->
-            {ok, Pid} = ioq_server2:start_link({user, Name}),
+            {ok, Pid} = ioq_server2:start_link({?BY_USER, Name}),
             khash:put(St#st.pid_idx, Name, Pid),
             khash:put(St#st.pid_idx, Pid, Name),
             Pid;
@@ -151,7 +186,7 @@ handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St)
     %% TODO: DbName = drop_compact_ext(DbName0),
     IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
         not_found ->
-            {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+            {ok, Pid} = ioq_server2:start_link({?BY_SHARD, DbName}),
             khash:put(St#st.pid_idx, DbName, Pid),
             khash:put(St#st.pid_idx, Pid, DbName),
             Pid;
@@ -210,7 +245,8 @@ add_monitor(Mons, FdPid, IOQPid) ->
         not_found ->
             Ref0 = erlang:monitor(process, FdPid),
             khash:put(Mons, Ref0, PidKey),
-            khash:put(Mons, PidKey, Ref0);
+            khash:put(Mons, PidKey, Ref0),
+            khash:put(Mons, FdPid, Ref0);
         Ref0 ->
             Ref0
     end,
@@ -233,12 +269,13 @@ drop_monitor(Mons, Ref) when is_reference(Ref) ->
         not_found ->
             %% TODO: shouldn't happen
             throw(unexpected);
-        {_FdPid, IOQPid}=PidKey ->
+        {FdPid, IOQPid}=PidKey ->
             case khash:get(Mons, IOQPid, not_found) of
                 not_found ->
                     %% TODO: shouldn't happen
                     throw(unexpected);
                 Refs ->
+                    khash:del(Mons, FdPid),
                     khash:del(Mons, Ref),
                     khash:del(Mons, PidKey),
                     case lists:delete(Ref, Refs) of
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 595babe..85e3644 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -112,13 +112,14 @@ call(Fd, Msg, Dimensions) when Dimensions =/= undefined ->
         _ ->
             Server = case ioq_opener:get_pid_for(Fd) of
                 undefined ->
-                    %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
-                    %%    undefined ->
-                    %%        ioq_server2;
-                    %%    IOQPid ->
-                    %%        IOQPid
-                    %%end;
-                    ioq_server2;
+                    IOQPid = case ioq_opener:fetch_pid_for(Req) of
+                        undefined ->
+                            ioq_server2;
+                        IOQPid0 ->
+                            IOQPid0
+                    end,
+                    ioq_opener:set_pid_for(Fd, IOQPid),
+                    IOQPid;
                 IOQPid ->
                     IOQPid
             end,
@@ -270,9 +271,9 @@ start_link() ->
 
 start_link(?MODULE) ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
-start_link({user, _Name}=User) ->
+start_link({by_user, _Name}=User) ->
     gen_server:start_link(?MODULE, [User], []);
-start_link({shard, _Name}=Shard) ->
+start_link({by_shard, _Name}=Shard) ->
     gen_server:start_link(?MODULE, [Shard], []).
 
 
@@ -803,11 +804,6 @@ test_simple_dedupe(St0) ->
         from = FromA,
         key = {Fd, Pos}
     },
-    _Request1B = Request0#ioq_request{
-        init_priority = Priority,
-        from = FromA,
-        key = {Fd, Pos}
-    },
     {noreply, St2, 0} = handle_call(Request0, FromA, St1),
     {noreply, St3, 0} = handle_call(Request0, FromB, St2),
     {reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3),
@@ -877,7 +873,7 @@ test_auto_scale(#state{queue=HQ}=St0) ->
     {_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ),
     Tests0 = [?_assertEqual(PriorityA, PriorityA2)],
     {_St, Tests} = lists:foldl(
-        fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) ->
+        fun(_N, {#state{iterations=I, resize_limit=_RL}=StN0, TestsN}) ->
             ReqN = BaseReq#ioq_request{ref=make_ref()},
             ExpectedPriority = case I == 1 of
                 false -> PriorityA;
@@ -968,7 +964,7 @@ cleanup(Servers) ->
 
 
 instantiate(S) ->
-    Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
+    Old = ?DEFAULT_CONCURRENCY * (1 + length(shards())),
     [{inparallel, lists:map(fun(IOClass) ->
         lists:map(fun(Shard) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index c2502c5..c228241 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -35,14 +35,16 @@ cleanup({Apps, Server}) ->
     exit(Server, kill).
 
 instantiate({_, S}) ->
+    Shards = shards(),
     [{inparallel, lists:map(fun(IOClass) ->
         lists:map(fun(Shard) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
-        end, shards())
+        end, Shards)
     end, io_classes())},
     case ioq:ioq2_enabled() of
         true ->
-            ?_assertEqual(1, ioq:set_disk_concurrency(10));
+            %% TODO: don't assume IOQ2 concurrency is 1
+            ?_assertEqual(1 + length(Shards), ioq:set_disk_concurrency(10));
         false ->
             ?_assertEqual(20, ioq:set_disk_concurrency(10))
     end,