[couchdb-ioq] branch ioq-per-shard-or-user updated (c18f705 -> 1dee640)

chewbranca pushed a change to branch ioq-per-shard-or-user
 discard c18f705  WIP: IOQ2 per shard/user
     new 1dee640  WIP: IOQ2 per shard/user

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c18f705)
             N -- N -- N   refs/heads/ioq-per-shard-or-user (1dee640)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.

Summary of changes:
 src/ioq_opener.erl | 1 -
 1 file changed, 1 deletion(-)

[couchdb-ioq] 01/01: WIP: IOQ2 per shard/user

chewbranca pushed a commit to branch ioq-per-shard-or-user
commit 1dee640342c4db8756606bff3962cfed6efc8bc1
Author: Russell Branca <>
AuthorDate: Thu Feb 28 18:48:21 2019 +0000

    WIP: IOQ2 per shard/user
 src/ioq.erl         |  24 ++++-
 src/ioq_opener.erl  | 256 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/ioq_server2.erl |  47 ++++++----
 src/ioq_sup.erl     |  17 +---
 test/ioq_tests.erl  |   7 +-
 5 files changed, 315 insertions(+), 36 deletions(-)

diff --git a/src/ioq.erl b/src/ioq.erl
index 160a448..2530dba 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -15,7 +15,12 @@
     get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
     get_disk_counters/0, get_disk_concurrency/0]).
-    ioq2_enabled/0
+    ioq2_enabled/0,
+    fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2
 -define(APPS, [config, folsom, couch_stats, ioq]).
@@ -72,5 +77,20 @@ get_osproc_requests() ->
     gen_server:call(ioq_osq, get_requests).
 ioq2_enabled() ->
-    config:get_boolean("ioq2", "enabled", false).
+    config:get_boolean("ioq2", "enabled", true).
+fetch_pid_for(DbName) ->
+    ioq_opener:fetch_pid_for(DbName).
+fetch_pid_for(DbName, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, FdPid).
+fetch_pid_for(DbName, UserCtx, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, UserCtx, FdPid).
+get_pid_for(FdPid) ->
+    ioq_opener:get_pid_for(FdPid).
+set_pid_for(FdPid, IOQPid) ->
+    ioq_opener:set_pid_for(FdPid, IOQPid).
diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
new file mode 100644
index 0000000..d85991e
--- /dev/null
+++ b/src/ioq_opener.erl
@@ -0,0 +1,256 @@
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+    start_link/0,
+    %%fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2,
+    get_ioq_pids/0,
+    get_pid_idx/0,
+    get_monitor_idx/0
+-define(BY_USER, by_user).
+-define(BY_SHARD, by_shard).
+-define(PDICT_MARKER, ioq_pid_for).
+-record(st, {
+    idle = [] :: [{erlang:timestamp(), pid()}],
+    pid_idx :: khash:khash(),
+    monitors :: khash:khash(),
+    dispatch :: by_shard | by_user | undefined
+ }).
+%% HACK: experiment to allow for spawning IOQ2 pids prior to the spawning
+%% the associated couch_file pids
+%%fetch_pid_for(DbName) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, self()).
+%% TODO: cleanup the overloaded arity once experiments concluded
+%%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, undefined, self());
+%%fetch_pid_for(DbName, #user_ctx{}=Ctx) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, Ctx, self());
+fetch_pid_for(DbName, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    fetch_pid_for(DbName, undefined, FdPid).
+fetch_pid_for(DbName, UserCtx, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    gen_server:call(?MODULE, {fetch, DbName, UserCtx, FdPid}, infinity).
+get_pid_for(undefined) ->
+    undefined;
+get_pid_for(DbName) when is_binary(DbName) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:get({?PDICT_MARKER, filename:rootname(DbName)});
+get_pid_for(FdPid) when is_pid(FdPid) ->
+    erlang:get({?PDICT_MARKER, FdPid}).
+set_pid_for(_, undefined) ->
+    ok;
+set_pid_for(DbName, IOQPid) when is_binary(DbName), is_pid(IOQPid) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:put({?PDICT_MARKER, filename:rootname(DbName)}, IOQPid),
+    ok;
+set_pid_for(FdPid, IOQPid) when is_pid(FdPid), is_pid(IOQPid) ->
+    erlang:put({?PDICT_MARKER, FdPid}, IOQPid),
+    ok.
+get_pid_idx() ->
+    gen_server:call(?MODULE, get_pid_idx, infinity).
+get_monitor_idx() ->
+    gen_server:call(?MODULE, get_monitor_idx, infinity).
+get_ioq_pids() ->
+    lists:foldl(
+      fun
+        ({K, _V}, Acc) when is_pid(K) ->
+              [K | Acc];
+        (_, Acc) ->
+              Acc
+      end, [], get_pid_idx()).
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+init([]) ->
+    process_flag(trap_exit, true),
+    {ok, PidIdx} = khash:new(),
+    {ok, Monitors} = khash:new(),
+    Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
+        "by_shard" -> ?BY_SHARD;
+        "by_user" -> ?BY_USER;
+        _ -> ?DEFAULT_DISPATCH
+    end,
+    St = #st{
+        pid_idx = PidIdx,
+        monitors = Monitors,
+        dispatch = Dispatch
+    },
+    {ok, St}.
+handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    Name = case UserCtx of
+        #user_ctx{name=Name0} -> Name0;
+        %% TODO: support unknown user
+        undefined -> throw(unknown_user)
+    end,
+    IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({user, Name}),
+            khash:put(St#st.pid_idx, Name, Pid),
+            khash:put(St#st.pid_idx, Pid, Name),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    %% TODO: DbName = drop_compact_ext(DbName0),
+    IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+            khash:put(St#st.pid_idx, DbName, Pid),
+            khash:put(St#st.pid_idx, Pid, DbName),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call(get_pid_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.pid_idx), St};
+handle_call(get_monitor_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.monitors), St};
+handle_call(_, _From, St) ->
+    {reply, ok, St}.
+handle_cast(_Msg, St) ->
+    {noreply, St}.
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+    case drop_monitor(St#st.monitors, Ref) of
+        {IOQPid, []} ->
+            Name = khash:get(St#st.pid_idx, IOQPid), %% TODO: assert found?
+            khash:del(St#st.pid_idx, IOQPid),
+            khash:del(St#st.pid_idx, Name);
+        {_IOQPid, _Refs} ->
+            ok
+    end,
+    {noreply, St};
+handle_info({'EXIT', Pid, _}, St) ->
+    case khash:get(St#st.pid_idx, Pid, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen, throw error?
+            ok;
+        Name ->
+            khash:del(St#st.pid_idx, Pid),
+            khash:del(St#st.pid_idx, Name)
+    end,
+    {noreply, St};
+handle_info(_Info, St) ->
+    {noreply, St}.
+terminate(_Reason, _St) ->
+    ok.
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+add_monitor(Mons, FdPid, IOQPid) ->
+    PidKey = {FdPid, IOQPid},
+    Ref = case khash:get(Mons, PidKey, not_found) of
+        not_found ->
+            Ref0 = erlang:monitor(process, FdPid),
+            khash:put(Mons, Ref0, PidKey),
+            khash:put(Mons, PidKey, Ref0);
+        Ref0 ->
+            Ref0
+    end,
+    case khash:get(Mons, IOQPid, not_found) of
+        not_found ->
+            khash:put(Mons, IOQPid, [Ref]);
+        Refs ->
+            case lists:member(Ref, Refs) of
+                true ->
+                    ok;
+                false ->
+                    khash:put(Mons, IOQPid, [Ref | Refs])
+            end
+    end,
+    ok.
+drop_monitor(Mons, Ref) when is_reference(Ref) ->
+    case khash:get(Mons, Ref, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen
+            throw(unexpected);
+        {_FdPid, IOQPid}=PidKey ->
+            case khash:get(Mons, IOQPid, not_found) of
+                not_found ->
+                    %% TODO: shouldn't happen
+                    throw(unexpected);
+                Refs ->
+                    khash:del(Mons, Ref),
+                    khash:del(Mons, PidKey),
+                    case lists:delete(Ref, Refs) of
+                        [] ->
+                            unlink(IOQPid),
+                            khash:del(Mons, IOQPid),
+                            exit(IOQPid, idle),
+                            {IOQPid, []};
+                        Refs1 ->
+                            khash:put(Mons, IOQPid, Refs1),
+                            {IOQPid, Refs1}
+                    end
+            end
+    end.
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 5e2e01f..1ed49c4 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -23,6 +23,8 @@
+    start_link/0,
+    start_link/1,
@@ -108,22 +110,17 @@ call(Fd, Msg, Dimensions) ->
                 [couchdb, io_queue2, RW, bypassed_count]),
             gen_server:call(Fd, Msg, infinity);
         _ ->
-            DispatchStrategy = config:get(
-                "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
-            Server = case DispatchStrategy of
-                ?DISPATCH_RANDOM ->
-                    maybe_seed(),
-                    SID = random:uniform(erlang:system_info(schedulers)),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_FD_HASH ->
-                    NumSchedulers = erlang:system_info(schedulers),
-                    SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_SINGLE_SERVER ->
-                    ?SERVER_ID(1);
-                _ ->
-                    SID = erlang:system_info(scheduler_id),
-                    ?SERVER_ID(SID)
+            Server = case ioq_opener:get_pid_for(Fd) of
+                undefined ->
+                    %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
+                    %%    undefined ->
+                    %%        ioq_server2;
+                    %%    IOQPid ->
+                    %%        IOQPid
+                    %%end;
+                    ioq_server2;
+                IOQPid ->
+                    IOQPid
             gen_server:call(Server, Req, infinity)
@@ -267,6 +264,18 @@ update_config() ->
     gen_server:call(?SERVER_ID(1), update_config, infinity).
+start_link() ->
+    start_link(?MODULE).
+start_link(?MODULE) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
+start_link({user, _Name}=User) ->
+    gen_server:start_link(?MODULE, [User], []);
+start_link({shard, _Name}=Shard) ->
+    gen_server:start_link(?MODULE, [Shard], []).
 start_link(Name, SID, Bind) ->
     Options = case Bind of
         true -> [{scheduler, SID}];
@@ -275,7 +284,7 @@ start_link(Name, SID, Bind) ->
     gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
-init([Name, SID]) ->
+init([{Type, Name}]) ->
     {ok, HQ} = hqueue:new(),
     {ok, Reqs} = khash:new(),
     {ok, Waiters} = khash:new(),
@@ -284,7 +293,7 @@ init([Name, SID]) ->
         reqs = Reqs,
         waiters = Waiters,
         server_name = Name,
-        scheduler_id = SID
+        scheduler_id = Type
     {ok, update_config_int(State)}.
@@ -644,7 +653,7 @@ mock_server(Config) ->
     meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
-    {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
+    {ok, State} = ioq_server2:init([{global, ?SERVER_ID(1)}]),
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 7ea6284..1129a9a 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -23,27 +23,16 @@ start_link() ->
 init([]) ->
     ok = ioq_config_listener:subscribe(),
-    IOQ2Children = ioq_server2_children(),
     {ok, {
         {one_for_one, 5, 10},
+            ?CHILD(ioq_opener, worker),
             ?CHILD(ioq_server, worker),
+            ?CHILD(ioq_server2, worker),
             ?CHILD(ioq_osq, worker)
-            | IOQ2Children
-ioq_server2_children() ->
-    Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
-    ioq_server2_children(erlang:system_info(schedulers), Bind).
-ioq_server2_children(Count, Bind) ->
-    lists:map(fun(I) ->
-        Name = list_to_atom("ioq_server_" ++ integer_to_list(I)),
-        {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]}
-    end, lists:seq(1, Count)).
 get_ioq2_servers() ->
-    lists:map(fun(I) ->
-        list_to_atom("ioq_server_" ++ integer_to_list(I))
-    end, lists:seq(1, erlang:system_info(schedulers))).
+    [ioq_server2 | ioq_opener:get_ioq_pids()].
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index b6b7bad..c2502c5 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -40,7 +40,12 @@ instantiate({_, S}) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
         end, shards())
     end, io_classes())},
-    ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+    case ioq:ioq2_enabled() of
+        true ->
+            ?_assertEqual(1, ioq:set_disk_concurrency(10));
+        false ->
+            ?_assertEqual(20, ioq:set_disk_concurrency(10))
+    end,
     ?_assertError(badarg, ioq:set_disk_concurrency(0)),
     ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
     ?_assertError(badarg, ioq:set_disk_concurrency(foo))].