You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 21:54:54 UTC

[couchdb-ioq] 01/07: WIP: IOQ2 per shard/user

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 54cb8abf851ef905d1620a198807aafe243f6c42
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Feb 28 18:48:21 2019 +0000

    WIP: IOQ2 per shard/user
---
 src/ioq.erl         |  24 ++++-
 src/ioq_opener.erl  | 256 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/ioq_server2.erl |  46 ++++++----
 src/ioq_sup.erl     |  17 ++--
 test/ioq_tests.erl  |   7 +-
 5 files changed, 318 insertions(+), 32 deletions(-)

diff --git a/src/ioq.erl b/src/ioq.erl
index 160a448..2530dba 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -15,7 +15,12 @@
     get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
     get_disk_counters/0, get_disk_concurrency/0]).
 -export([
-    ioq2_enabled/0
+    ioq2_enabled/0,
+    fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2
 ]).
 
 -define(APPS, [config, folsom, couch_stats, ioq]).
@@ -72,5 +77,20 @@ get_osproc_requests() ->
     gen_server:call(ioq_osq, get_requests).
 
 ioq2_enabled() ->
-    config:get_boolean("ioq2", "enabled", false).
+    config:get_boolean("ioq2", "enabled", true).
+
+fetch_pid_for(DbName) ->
+    ioq_opener:fetch_pid_for(DbName).
+
+fetch_pid_for(DbName, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, FdPid).
+
+fetch_pid_for(DbName, UserCtx, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, UserCtx, FdPid).
+
+get_pid_for(FdPid) ->
+    ioq_opener:get_pid_for(FdPid).
+
+set_pid_for(FdPid, IOQPid) ->
+    ioq_opener:set_pid_for(FdPid, IOQPid).
 
diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
new file mode 100644
index 0000000..d85991e
--- /dev/null
+++ b/src/ioq_opener.erl
@@ -0,0 +1,256 @@
+-module(ioq_opener).
+-behavior(gen_server).
+
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+-export([
+    start_link/0,
+    %%fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2,
+    get_ioq_pids/0,
+    get_pid_idx/0,
+    get_monitor_idx/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(BY_USER, by_user).
+-define(BY_SHARD, by_shard).
+-define(DEFAULT_DISPATCH, ?BY_SHARD).
+-define(PDICT_MARKER, ioq_pid_for).
+
+
+-record(st, {
+    idle = [] :: [{erlang:timestamp(), pid()}],
+    pid_idx :: khash:khash(),
+    monitors :: khash:khash(),
+    dispatch :: by_shard | by_user | undefined
+ }).
+
+
+%% HACK: experiment to allow for spawning IOQ2 pids prior to the spawning
+%% the associated couch_file pids
+%%fetch_pid_for(DbName) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, self()).
+
+
+%% TODO: cleanup the overloaded arity once experiments concluded
+%%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, undefined, self());
+%%fetch_pid_for(DbName, #user_ctx{}=Ctx) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, Ctx, self());
+fetch_pid_for(DbName, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    fetch_pid_for(DbName, undefined, FdPid).
+
+
+fetch_pid_for(DbName, UserCtx, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    gen_server:call(?MODULE, {fetch, DbName, UserCtx, FdPid}, infinity).
+
+
+get_pid_for(undefined) ->
+    undefined;
+get_pid_for(DbName) when is_binary(DbName) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:get({?PDICT_MARKER, filename:rootname(DbName)});
+get_pid_for(FdPid) when is_pid(FdPid) ->
+    erlang:get({?PDICT_MARKER, FdPid}).
+
+
+set_pid_for(_, undefined) ->
+    ok;
+set_pid_for(DbName, IOQPid) when is_binary(DbName), is_pid(IOQPid) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:put({?PDICT_MARKER, filename:rootname(DbName)}, IOQPid),
+    ok;
+set_pid_for(FdPid, IOQPid) when is_pid(FdPid), is_pid(IOQPid) ->
+    erlang:put({?PDICT_MARKER, FdPid}, IOQPid),
+    ok.
+
+
+get_pid_idx() ->
+    gen_server:call(?MODULE, get_pid_idx, infinity).
+
+
+get_monitor_idx() ->
+    gen_server:call(?MODULE, get_monitor_idx, infinity).
+
+
+get_ioq_pids() ->
+    lists:foldl(
+      fun
+        ({K, _V}, Acc) when is_pid(K) ->
+              [K | Acc];
+        (_, Acc) ->
+              Acc
+      end, [], get_pid_idx()).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    {ok, PidIdx} = khash:new(),
+    {ok, Monitors} = khash:new(),
+    Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
+        "by_shard" -> ?BY_SHARD;
+        "by_user" -> ?BY_USER;
+        _ -> ?DEFAULT_DISPATCH
+    end,
+    St = #st{
+        pid_idx = PidIdx,
+        monitors = Monitors,
+        dispatch = Dispatch
+    },
+    {ok, St}.
+
+
+handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    Name = case UserCtx of
+        #user_ctx{name=Name0} -> Name0;
+        %% TODO: support unknown user
+        undefined -> throw(unknown_user)
+    end,
+    IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({user, Name}),
+            khash:put(St#st.pid_idx, Name, Pid),
+            khash:put(St#st.pid_idx, Pid, Name),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    %% TODO: DbName = drop_compact_ext(DbName0),
+    IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+            khash:put(St#st.pid_idx, DbName, Pid),
+            khash:put(St#st.pid_idx, Pid, DbName),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call(get_pid_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.pid_idx), St};
+handle_call(get_monitor_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.monitors), St};
+handle_call(_, _From, St) ->
+    {reply, ok, St}.
+
+
+handle_cast(_Msg, St) ->
+    {noreply, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+    case drop_monitor(St#st.monitors, Ref) of
+        {IOQPid, []} ->
+            Name = khash:get(St#st.pid_idx, IOQPid), %% TODO: assert found?
+            khash:del(St#st.pid_idx, IOQPid),
+            khash:del(St#st.pid_idx, Name);
+        {_IOQPid, _Refs} ->
+            ok
+    end,
+    {noreply, St};
+handle_info({'EXIT', Pid, _}, St) ->
+    case khash:get(St#st.pid_idx, Pid, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen, throw error?
+            ok;
+        Name ->
+            khash:del(St#st.pid_idx, Pid),
+            khash:del(St#st.pid_idx, Name)
+    end,
+    {noreply, St};
+handle_info(_Info, St) ->
+    {noreply, St}.
+
+
+terminate(_Reason, _St) ->
+    ok.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+add_monitor(Mons, FdPid, IOQPid) ->
+    PidKey = {FdPid, IOQPid},
+    Ref = case khash:get(Mons, PidKey, not_found) of
+        not_found ->
+            Ref0 = erlang:monitor(process, FdPid),
+            khash:put(Mons, Ref0, PidKey),
+            khash:put(Mons, PidKey, Ref0);
+        Ref0 ->
+            Ref0
+    end,
+    case khash:get(Mons, IOQPid, not_found) of
+        not_found ->
+            khash:put(Mons, IOQPid, [Ref]);
+        Refs ->
+            case lists:member(Ref, Refs) of
+                true ->
+                    ok;
+                false ->
+                    khash:put(Mons, IOQPid, [Ref | Refs])
+            end
+    end,
+    ok.
+
+
+drop_monitor(Mons, Ref) when is_reference(Ref) ->
+    case khash:get(Mons, Ref, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen
+            throw(unexpected);
+        {_FdPid, IOQPid}=PidKey ->
+            case khash:get(Mons, IOQPid, not_found) of
+                not_found ->
+                    %% TODO: shouldn't happen
+                    throw(unexpected);
+                Refs ->
+                    khash:del(Mons, Ref),
+                    khash:del(Mons, PidKey),
+                    case lists:delete(Ref, Refs) of
+                        [] ->
+                            unlink(IOQPid),
+                            khash:del(Mons, IOQPid),
+                            exit(IOQPid, idle),
+                            {IOQPid, []};
+                        Refs1 ->
+                            khash:put(Mons, IOQPid, Refs1),
+                            {IOQPid, Refs1}
+                    end
+            end
+    end.
+
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index dadd44d..b3dc24f 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -23,6 +23,8 @@
     code_change/3
 ]).
 -export([
+    start_link/0,
+    start_link/1,
     start_link/3,
     call/3,
     pcall/1,
@@ -108,21 +110,17 @@ call(Fd, Msg, Dimensions) ->
                 [couchdb, io_queue2, RW, bypassed_count]),
             gen_server:call(Fd, Msg, infinity);
         _ ->
-            DispatchStrategy = config:get(
-                "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
-            Server = case DispatchStrategy of
-                ?DISPATCH_RANDOM ->
-                    SID = rand:uniform(erlang:system_info(schedulers)),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_FD_HASH ->
-                    NumSchedulers = erlang:system_info(schedulers),
-                    SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_SINGLE_SERVER ->
-                    ?SERVER_ID(1);
-                _ ->
-                    SID = erlang:system_info(scheduler_id),
-                    ?SERVER_ID(SID)
+            Server = case ioq_opener:get_pid_for(Fd) of
+                undefined ->
+                    %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
+                    %%    undefined ->
+                    %%        ioq_server2;
+                    %%    IOQPid ->
+                    %%        IOQPid
+                    %%end;
+                    ioq_server2;
+                IOQPid ->
+                    IOQPid
             end,
             gen_server:call(Server, Req, infinity)
     end.
@@ -266,6 +264,18 @@ update_config() ->
     gen_server:call(?SERVER_ID(1), update_config, infinity).
 
 
+start_link() ->
+    start_link(?MODULE).
+
+
+start_link(?MODULE) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
+start_link({user, _Name}=User) ->
+    gen_server:start_link(?MODULE, [User], []);
+start_link({shard, _Name}=Shard) ->
+    gen_server:start_link(?MODULE, [Shard], []).
+
+
 start_link(Name, SID, Bind) ->
     Options = case Bind of
         true -> [{scheduler, SID}];
@@ -274,7 +284,7 @@ start_link(Name, SID, Bind) ->
     gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
 
 
-init([Name, SID]) ->
+init([{Type, Name}]) ->
     {ok, HQ} = hqueue:new(),
     {ok, Reqs} = khash:new(),
     {ok, Waiters} = khash:new(),
@@ -283,7 +293,7 @@ init([Name, SID]) ->
         reqs = Reqs,
         waiters = Waiters,
         server_name = Name,
-        scheduler_id = SID
+        scheduler_id = Type
     },
     {ok, update_config_int(State)}.
 
@@ -630,7 +640,7 @@ mock_server(Config) ->
     meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
         Default
     end),
-    {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
+    {ok, State} = ioq_server2:init([{global, ?SERVER_ID(1)}]),
     State.
 
 
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 8bfda77..973886a 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -15,7 +15,7 @@
 -vsn(1).
 -behaviour(config_listener).
 -export([start_link/0, init/1]).
--export([get_ioq2_servers/0]).
+-export([get_ioq2_servers/0, get_ioq2_servers_new/0]).
 -export([handle_config_change/5, handle_config_terminate/3]).
 -export([processes/1]).
 
@@ -28,26 +28,20 @@ start_link() ->
 
 init([]) ->
     ok = ioq_config_listener:subscribe(),
-    IOQ2Children = ioq_server2_children(),
     {ok, {
         {one_for_one, 5, 10},
         [
             ?CHILD_WITH_ARGS(config_listener_mon, worker, [?MODULE, nil]),
+            ?CHILD(ioq_opener, worker),
             ?CHILD(ioq_server, worker),
+            ?CHILD(ioq_server2, worker),
             ?CHILD(ioq_osq, worker)
-            | IOQ2Children
         ]
     }}.
 
-ioq_server2_children() ->
-    Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
-    ioq_server2_children(erlang:system_info(schedulers), Bind).
 
-ioq_server2_children(Count, Bind) ->
-    lists:map(fun(I) ->
-        Name = list_to_atom("ioq_server_" ++ integer_to_list(I)),
-        {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]}
-    end, lists:seq(1, Count)).
+get_ioq2_servers_new() ->
+    [ioq_server2 | ioq_opener:get_ioq_pids()].
 
 get_ioq2_servers() ->
     lists:map(fun(I) ->
@@ -92,3 +86,4 @@ filter_children(RegExp) ->
             _ -> false
         end
     end, supervisor:which_children(?MODULE)).
+
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index b6b7bad..c2502c5 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -40,7 +40,12 @@ instantiate({_, S}) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
         end, shards())
     end, io_classes())},
-    ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+    case ioq:ioq2_enabled() of
+        true ->
+            ?_assertEqual(1, ioq:set_disk_concurrency(10));
+        false ->
+            ?_assertEqual(20, ioq:set_disk_concurrency(10))
+    end,
     ?_assertError(badarg, ioq:set_disk_concurrency(0)),
     ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
     ?_assertError(badarg, ioq:set_disk_concurrency(foo))].