You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2019/02/28 18:50:51 UTC

[couchdb-ioq] branch ioq-per-shard-or-user created (now c18f705)

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a change to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git.


      at c18f705  WIP: IOQ2 per shard/user

This branch includes the following new commits:

     new c18f705  WIP: IOQ2 per shard/user

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb-ioq] 01/01: WIP: IOQ2 per shard/user

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit c18f705eaf83a5d82ed1fe4d2373be44db70fb0c
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Feb 28 18:48:21 2019 +0000

    WIP: IOQ2 per shard/user
---
 src/ioq.erl         |  24 ++++-
 src/ioq_opener.erl  | 257 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/ioq_server2.erl |  47 ++++++----
 src/ioq_sup.erl     |  17 +---
 test/ioq_tests.erl  |   7 +-
 5 files changed, 316 insertions(+), 36 deletions(-)

diff --git a/src/ioq.erl b/src/ioq.erl
index 160a448..2530dba 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -15,7 +15,12 @@
     get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
     get_disk_counters/0, get_disk_concurrency/0]).
 -export([
-    ioq2_enabled/0
+    ioq2_enabled/0,
+    fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2
 ]).
 
 -define(APPS, [config, folsom, couch_stats, ioq]).
@@ -72,5 +77,20 @@ get_osproc_requests() ->
     gen_server:call(ioq_osq, get_requests).
 
 ioq2_enabled() ->
-    config:get_boolean("ioq2", "enabled", false).
+    config:get_boolean("ioq2", "enabled", true).
+
+fetch_pid_for(DbName) ->
+    ioq_opener:fetch_pid_for(DbName).
+
+fetch_pid_for(DbName, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, FdPid).
+
+fetch_pid_for(DbName, UserCtx, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, UserCtx, FdPid).
+
+get_pid_for(FdPid) ->
+    ioq_opener:get_pid_for(FdPid).
+
+set_pid_for(FdPid, IOQPid) ->
+    ioq_opener:set_pid_for(FdPid, IOQPid).
 
diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
new file mode 100644
index 0000000..e021608
--- /dev/null
+++ b/src/ioq_opener.erl
@@ -0,0 +1,257 @@
+-module(ioq_opener).
+-behavior(gen_server).
+
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+-export([
+    start_link/0,
+    %%fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2,
+    get_ioq_pids/0,
+    get_pid_idx/0,
+    get_monitor_idx/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(BY_USER, by_user).
+-define(BY_SHARD, by_shard).
+-define(DEFAULT_DISPATCH, ?BY_SHARD).
+-define(PDICT_MARKER, ioq_pid_for).
+
+
+-record(st, {
+    idle = [] :: [{erlang:timestamp(), pid()}],
+    pid_idx :: khash:khash(),
+    monitors :: khash:khash(),
+    dispatch :: by_shard | by_user | undefined
+ }).
+
+
+%% HACK: experiment to allow for spawning IOQ2 pids prior to the spawning
+%% the associated couch_file pids
+%%fetch_pid_for(DbName) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, self()).
+
+
+%% TODO: cleanup the overloaded arity once experiments concluded
+%%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, undefined, self());
+%%fetch_pid_for(DbName, #user_ctx{}=Ctx) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, Ctx, self());
+fetch_pid_for(DbName, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    fetch_pid_for(DbName, undefined, FdPid).
+
+
+fetch_pid_for(DbName, UserCtx, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    gen_server:call(?MODULE, {fetch, DbName, UserCtx, FdPid}, infinity).
+
+
+get_pid_for(undefined) ->
+    undefined;
+get_pid_for(DbName) when is_binary(DbName) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:get({?PDICT_MARKER, filename:rootname(DbName)});
+get_pid_for(FdPid) when is_pid(FdPid) ->
+    erlang:get({?PDICT_MARKER, FdPid}).
+
+
+set_pid_for(_, undefined) ->
+    ok;
+set_pid_for(DbName, IOQPid) when is_binary(DbName), is_pid(IOQPid) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:put({?PDICT_MARKER, filename:rootname(DbName)}, IOQPid),
+    ok;
+set_pid_for(FdPid, IOQPid) when is_pid(FdPid), is_pid(IOQPid) ->
+    erlang:put({?PDICT_MARKER, FdPid}, IOQPid),
+    ok.
+
+
+get_pid_idx() ->
+    gen_server:call(?MODULE, get_pid_idx, infinity).
+
+
+get_monitor_idx() ->
+    gen_server:call(?MODULE, get_monitor_idx, infinity).
+
+
+get_ioq_pids() ->
+    lists:foldl(
+      fun
+        ({K, _V}, Acc) when is_pid(K) ->
+              [K | Acc];
+        (_, Acc) ->
+              Acc
+      end, [], get_pid_idx()).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    {ok, PidIdx} = khash:new(),
+    {ok, Monitors} = khash:new(),
+    Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
+        "by_shard" -> ?BY_SHARD;
+        "by_user" -> ?BY_USER;
+        _ -> ?DEFAULT_DISPATCH
+    end,
+    St = #st{
+        pid_idx = PidIdx,
+        monitors = Monitors,
+        dispatch = Dispatch
+    },
+    {ok, St}.
+
+
+handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    Name = case UserCtx of
+        #user_ctx{name=Name0} -> Name0;
+        %% TODO: support unknown user
+        undefined -> throw(unknown_user)
+    end,
+    IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({user, Name}),
+            khash:put(St#st.pid_idx, Name, Pid),
+            khash:put(St#st.pid_idx, Pid, Name),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    %% TODO: DbName = drop_compact_ext(DbName0),
+    IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+            khash:put(St#st.pid_idx, DbName, Pid),
+            khash:put(St#st.pid_idx, Pid, DbName),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call(get_pid_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.pid_idx), St};
+handle_call(get_monitor_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.monitors), St};
+handle_call(_, _From, St) ->
+    {reply, ok, St}.
+
+
+handle_cast(_Msg, St) ->
+    {noreply, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+    case drop_monitor(St#st.monitors, Ref) of
+        {IOQPid, []} ->
+            Name = khash:get(St#st.pid_idx, IOQPid), %% TODO: assert found?
+            khash:del(St#st.pid_idx, IOQPid),
+            khash:del(St#st.pid_idx, Name);
+        {_IOQPid, _Refs} ->
+            ok
+    end,
+    {noreply, St};
+handle_info({'EXIT', Pid, _}, St) ->
+    case khash:get(St#st.pid_idx, Pid, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen, throw error?
+            ok;
+        Name ->
+            khash:del(St#st.pid_idx, Pid),
+            khash:del(St#st.pid_idx, Name)
+    end,
+    {noreply, St};
+handle_info(_Info, St) ->
+    {noreply, St}.
+
+
+terminate(_Reason, _St) ->
+    ok.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+add_monitor(Mons, FdPid, IOQPid) ->
+    PidKey = {FdPid, IOQPid},
+    Ref = case khash:get(Mons, PidKey, not_found) of
+        not_found ->
+            Ref0 = erlang:monitor(process, FdPid),
+            khash:put(Mons, Ref0, PidKey),
+            khash:put(Mons, PidKey, Ref0);
+        Ref0 ->
+            Ref0
+    end,
+    case khash:get(Mons, IOQPid, not_found) of
+        not_found ->
+            khash:put(Mons, IOQPid, [Ref]);
+        Refs ->
+            case lists:member(Ref, Refs) of
+                true ->
+                    ok;
+                false ->
+                    khash:put(Mons, IOQPid, [Ref | Refs])
+            end
+    end,
+    ok.
+
+
+drop_monitor(Mons, Ref) when is_reference(Ref) ->
+    case khash:get(Mons, Ref, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen
+            throw(unexpected);
+        {_FdPid, IOQPid}=PidKey ->
+            case khash:get(Mons, IOQPid, not_found) of
+                not_found ->
+                    %% TODO: shouldn't happen
+                    throw(unexpected);
+                Refs ->
+                    khash:del(Mons, Ref),
+                    khash:del(Mons, PidKey),
+                    case lists:delete(Ref, Refs) of
+                        [] ->
+                            unlink(IOQPid),
+                            khash:del(Mons, IOQPid),
+                            io:format("KILLING PID: ~p~n", [IOQPid]),
+                            exit(IOQPid, idle),
+                            {IOQPid, []};
+                        Refs1 ->
+                            khash:put(Mons, IOQPid, Refs1),
+                            {IOQPid, Refs1}
+                    end
+            end
+    end.
+
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 5e2e01f..1ed49c4 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -23,6 +23,8 @@
     code_change/3
 ]).
 -export([
+    start_link/0,
+    start_link/1,
     start_link/3,
     call/3,
     pcall/1,
@@ -108,22 +110,17 @@ call(Fd, Msg, Dimensions) ->
                 [couchdb, io_queue2, RW, bypassed_count]),
             gen_server:call(Fd, Msg, infinity);
         _ ->
-            DispatchStrategy = config:get(
-                "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
-            Server = case DispatchStrategy of
-                ?DISPATCH_RANDOM ->
-                    maybe_seed(),
-                    SID = random:uniform(erlang:system_info(schedulers)),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_FD_HASH ->
-                    NumSchedulers = erlang:system_info(schedulers),
-                    SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_SINGLE_SERVER ->
-                    ?SERVER_ID(1);
-                _ ->
-                    SID = erlang:system_info(scheduler_id),
-                    ?SERVER_ID(SID)
+            Server = case ioq_opener:get_pid_for(Fd) of
+                undefined ->
+                    %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
+                    %%    undefined ->
+                    %%        ioq_server2;
+                    %%    IOQPid ->
+                    %%        IOQPid
+                    %%end;
+                    ioq_server2;
+                IOQPid ->
+                    IOQPid
             end,
             gen_server:call(Server, Req, infinity)
     end.
@@ -267,6 +264,18 @@ update_config() ->
     gen_server:call(?SERVER_ID(1), update_config, infinity).
 
 
+start_link() ->
+    start_link(?MODULE).
+
+
+start_link(?MODULE) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
+start_link({user, _Name}=User) ->
+    gen_server:start_link(?MODULE, [User], []);
+start_link({shard, _Name}=Shard) ->
+    gen_server:start_link(?MODULE, [Shard], []).
+
+
 start_link(Name, SID, Bind) ->
     Options = case Bind of
         true -> [{scheduler, SID}];
@@ -275,7 +284,7 @@ start_link(Name, SID, Bind) ->
     gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
 
 
-init([Name, SID]) ->
+init([{Type, Name}]) ->
     {ok, HQ} = hqueue:new(),
     {ok, Reqs} = khash:new(),
     {ok, Waiters} = khash:new(),
@@ -284,7 +293,7 @@ init([Name, SID]) ->
         reqs = Reqs,
         waiters = Waiters,
         server_name = Name,
-        scheduler_id = SID
+        scheduler_id = Type
     },
     {ok, update_config_int(State)}.
 
@@ -644,7 +653,7 @@ mock_server(Config) ->
     meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
         Default
     end),
-    {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
+    {ok, State} = ioq_server2:init([{global, ?SERVER_ID(1)}]),
     State.
 
 
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 7ea6284..1129a9a 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -23,27 +23,16 @@ start_link() ->
 
 init([]) ->
     ok = ioq_config_listener:subscribe(),
-    IOQ2Children = ioq_server2_children(),
     {ok, {
         {one_for_one, 5, 10},
         [
+            ?CHILD(ioq_opener, worker),
             ?CHILD(ioq_server, worker),
+            ?CHILD(ioq_server2, worker),
             ?CHILD(ioq_osq, worker)
-            | IOQ2Children
         ]
     }}.
 
-ioq_server2_children() ->
-    Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
-    ioq_server2_children(erlang:system_info(schedulers), Bind).
-
-ioq_server2_children(Count, Bind) ->
-    lists:map(fun(I) ->
-        Name = list_to_atom("ioq_server_" ++ integer_to_list(I)),
-        {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]}
-    end, lists:seq(1, Count)).
 
 get_ioq2_servers() ->
-    lists:map(fun(I) ->
-        list_to_atom("ioq_server_" ++ integer_to_list(I))
-    end, lists:seq(1, erlang:system_info(schedulers))).
+    [ioq_server2 | ioq_opener:get_ioq_pids()].
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index b6b7bad..c2502c5 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -40,7 +40,12 @@ instantiate({_, S}) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
         end, shards())
     end, io_classes())},
-    ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+    case ioq:ioq2_enabled() of
+        true ->
+            ?_assertEqual(1, ioq:set_disk_concurrency(10));
+        false ->
+            ?_assertEqual(20, ioq:set_disk_concurrency(10))
+    end,
     ?_assertError(badarg, ioq:set_disk_concurrency(0)),
     ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
     ?_assertError(badarg, ioq:set_disk_concurrency(foo))].