You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 21:54:53 UTC

[couchdb-ioq] branch ioq-per-shard-or-user updated (d2c2a43 -> 55c8609)

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a change to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git.


 discard d2c2a43  Add license blurb to ioq_opener
 discard acd776b  Support finding ioq pids by #ioq_request{}
 discard c5e91ca  HACK: explode loudly on missing io_priority
 discard 1dee640  WIP: IOQ2 per shard/user
     add b5b801a  Reconfigure IOQ on config update
     add 89fe01f  Merge pull request #8 from cloudant/update_handle_config_terminate
     add 8dfe695  Use cast to send update_config to ioq2
     add 1b2f8ca  Merge pull request #9 from cloudant/use-cast-for-ioq2-update_config
     add 63617af  Fix compiler warnings
     add 103cf1c  Merge pull request #10 from cloudant/fix-compilation-warnings
     add 471e6c2  Make PropEr an optional (test) dependency
     add af4f976  Merge pull request #11 from apache/optional-proper
     add 9b4c5d8  Start couch_log in tests
     add 7edcb03  Merge pull request #12 from cloudant/start-couch_log-in-test
     add 08c6bbb  Rearrange IOQ config declarations into header file
     add d6e7a78  Allow dynamic classes in is_valid_class
     add c0ce268  Add dynamic IOQ class documentation
     add 8ada5fa  Merge pull request #13 from apache/allow-for-dynamic-ioq-classes
     add b1ce76d  Log from 0 is undefined
     add 5ff5921  Merge pull request #15 from apache/ioq_update_hist
     add 4843f37  Reduce logging noise
     new 54cb8ab  WIP: IOQ2 per shard/user
     new b780a61  HACK: explode loudly on missing io_priority
     new 034db83  Support finding ioq pids by #ioq_request{}
     new 05f6ed0  Add license blurb to ioq_opener
     new a84ec63  Don't crash on no io_priority
     new 41cd71c  Support IOQ2 pid per couch_file
     new 55c8609  Add ioq helper functions

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d2c2a43)
            \
             N -- N -- N   refs/heads/ioq-per-shard-or-user (55c8609)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 IOQ2.md                   |  10 ++++
 include/ioq.hrl           |  16 ++++++
 rebar.config              |  11 ----
 rebar.config.script       |  18 ++++++
 src/ioq.erl               |  16 ++++++
 src/ioq_config.erl        |  23 ++++----
 src/ioq_osq.erl           |   7 ++-
 src/ioq_server.erl        |  20 ++++---
 src/ioq_server2.erl       |  85 +++++++++++++++++------------
 src/ioq_sup.erl           |  55 ++++++++++++++++++-
 test/ioq_config_tests.erl | 136 ++++++++++++++++++++++++++++++++++++++++++++++
 test/ioq_kv_tests.erl     |   2 +
 12 files changed, 330 insertions(+), 69 deletions(-)
 delete mode 100644 rebar.config
 create mode 100644 rebar.config.script

[couchdb-ioq] 06/07: Support IOQ2 pid per couch_file

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 41cd71c360bef825566b311b64539dfdc2ab5e70
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Aug 17 15:11:34 2021 -0700

    Support IOQ2 pid per couch_file
---
 include/ioq.hrl     |  7 ++++++
 src/ioq_server2.erl | 65 ++++++++++++++++++++++++++++++++++++-----------------
 2 files changed, 52 insertions(+), 20 deletions(-)

diff --git a/include/ioq.hrl b/include/ioq.hrl
index 499a140..0577579 100644
--- a/include/ioq.hrl
+++ b/include/ioq.hrl
@@ -61,6 +61,13 @@
 }).
 
 
+-record(ioq_file, {
+    fd,
+    ioq,
+    tab
+}).
+
+
 -type io_priority() :: db_compact
     | db_update
     | interactive
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 654b8c8..7269368 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -66,7 +66,8 @@
     next_key = 1 :: pos_integer(),
     server_name :: atom(),
     scheduler_id = 0 :: non_neg_integer(),
-    max_priority = ?DEFAULT_MAX_PRIORITY :: float()
+    max_priority = ?DEFAULT_MAX_PRIORITY :: float(),
+    fd :: pid() | undefined
 }).
 
 
@@ -102,6 +103,11 @@ call(Fd, Msg, Dimensions) ->
         t0 = os:timestamp()
     },
     Req = add_request_dimensions(Req0, Dimensions),
+    call_int(Fd, Req).
+
+%% TODO: handle Clouseau requests with isolated IOQ2 pid
+%%call_int(#ioq_file{fd={clouseau, _}=IOF, Req) ->
+call_int(#ioq_file{ioq=undefined, fd=Fd}, #ioq_request{msg=Msg}=Req) ->
     Class = atom_to_list(Req#ioq_request.class),
     case config:get_boolean("ioq2.bypass", Class, false) of
         true ->
@@ -111,21 +117,24 @@ call(Fd, Msg, Dimensions) ->
                 [couchdb, io_queue2, RW, bypassed_count]),
             gen_server:call(Fd, Msg, infinity);
         _ ->
-            Server = case ioq_opener:get_pid_for(Fd) of
-                undefined ->
-                    IOQPid = case ioq_opener:fetch_pid_for(Req) of
-                        undefined ->
-                            ioq_server2;
-                        IOQPid0 ->
-                            IOQPid0
-                    end,
-                    ioq_opener:set_pid_for(Fd, IOQPid),
-                    IOQPid;
-                IOQPid ->
-                    IOQPid
-            end,
+            Server = ioq_server2,
+            %%Server = case ioq_opener:get_pid_for(Fd) of
+            %%    undefined ->
+            %%        IOQPid = case ioq_opener:fetch_pid_for(Req) of
+            %%            undefined ->
+            %%                ioq_server2;
+            %%            IOQPid0 ->
+            %%                IOQPid0
+            %%        end,
+            %%        ioq_opener:set_pid_for(Fd, IOQPid),
+            %%        IOQPid;
+            %%    IOQPid ->
+            %%        IOQPid
+            %%end,
             gen_server:call(Server, Req, infinity)
-    end.
+    end;
+call_int(#ioq_file{ioq=IOP}, #ioq_request{}=Req) ->
+    gen_server:call(IOP, Req, infinity).
 
 
 -spec pcall(any()) -> any().
@@ -275,6 +284,8 @@ start_link(?MODULE) ->
 start_link({by_user, _Name}=User) ->
     gen_server:start_link(?MODULE, [User], []);
 start_link({by_shard, _Name}=Shard) ->
+    gen_server:start_link(?MODULE, [Shard], []);
+start_link({by_shard, _Name, _Fd}=Shard) ->
     gen_server:start_link(?MODULE, [Shard], []).
 
 
@@ -286,16 +297,25 @@ start_link(Name, SID, Bind) ->
     gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
 
 
-init([{Type, Name}]) ->
+init([Init]) ->
+    {Type, Name, Fd} = case Init of
+        {_, _, FdPid} = Init0 ->
+            erlang:link(FdPid),
+            Init0;
+        {Type0, Name0} ->
+            {Type0, Name0, undefined}
+    end,
     {ok, HQ} = hqueue:new(),
     {ok, Reqs} = khash:new(),
     {ok, Waiters} = khash:new(),
+    erlang:put(couch_file, Name),
     State = #state{
         queue = HQ,
         reqs = Reqs,
         waiters = Waiters,
         server_name = Name,
-        scheduler_id = Type
+        scheduler_id = Type,
+        fd = Fd
     },
     {ok, update_config_int(State)}.
 
@@ -328,8 +348,8 @@ handle_call(get_pending_reqs, _From, #state{queue=HQ}=State) ->
     {reply, hqueue:to_list(HQ), State, 0};
 handle_call(get_counters, _From, State) ->
     {reply, undefined, State, 0};
-handle_call(_, _From, State) ->
-    {reply, ok, State, 0}.
+handle_call(Msg, _From, State) ->
+    {reply, {error, unknown, Msg}, State, 0}.
 
 
 handle_cast(update_config, State) ->
@@ -444,13 +464,18 @@ submit_request(Req, #state{iterations=I, resize_limit=RL}=State) when I >= RL ->
     submit_request(Req, State#state{iterations=0});
 submit_request(Req, #state{iterations=Iterations}=State) ->
     #ioq_request{
-        fd = Fd,
+        fd = Fd0,
         msg = Call,
         class = Class,
         t0 = T0
     } = Req,
     #state{reqs = Reqs} = State,
 
+    Fd = case Fd0 of
+        #ioq_file{fd=Fd1} -> Fd1;
+        _                 -> Fd0
+    end,
+
     % make the request
     Ref = erlang:monitor(process, Fd),
     Fd ! {'$gen_call', {self(), Ref}, Call},

[couchdb-ioq] 02/07: HACK: explode loudly on missing io_priority

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit b780a6152c7261e2deb55725eae05565e5008a4f
Author: Russell Branca <ch...@apache.org>
AuthorDate: Wed Mar 6 19:14:04 2019 +0000

    HACK: explode loudly on missing io_priority
---
 src/ioq_server2.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index b3dc24f..595babe 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -94,7 +94,7 @@
 
 
 -spec call(pid(), term(), io_dimensions()) -> term().
-call(Fd, Msg, Dimensions) ->
+call(Fd, Msg, Dimensions) when Dimensions =/= undefined ->
     Req0 = #ioq_request{
         fd = Fd,
         msg = Msg,

[couchdb-ioq] 01/07: WIP: IOQ2 per shard/user

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 54cb8abf851ef905d1620a198807aafe243f6c42
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Feb 28 18:48:21 2019 +0000

    WIP: IOQ2 per shard/user
---
 src/ioq.erl         |  24 ++++-
 src/ioq_opener.erl  | 256 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/ioq_server2.erl |  46 ++++++----
 src/ioq_sup.erl     |  17 ++--
 test/ioq_tests.erl  |   7 +-
 5 files changed, 318 insertions(+), 32 deletions(-)

diff --git a/src/ioq.erl b/src/ioq.erl
index 160a448..2530dba 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -15,7 +15,12 @@
     get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
     get_disk_counters/0, get_disk_concurrency/0]).
 -export([
-    ioq2_enabled/0
+    ioq2_enabled/0,
+    fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2
 ]).
 
 -define(APPS, [config, folsom, couch_stats, ioq]).
@@ -72,5 +77,20 @@ get_osproc_requests() ->
     gen_server:call(ioq_osq, get_requests).
 
 ioq2_enabled() ->
-    config:get_boolean("ioq2", "enabled", false).
+    config:get_boolean("ioq2", "enabled", true).
+
+fetch_pid_for(DbName) ->
+    ioq_opener:fetch_pid_for(DbName).
+
+fetch_pid_for(DbName, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, FdPid).
+
+fetch_pid_for(DbName, UserCtx, FdPid) ->
+    ioq_opener:fetch_pid_for(DbName, UserCtx, FdPid).
+
+get_pid_for(FdPid) ->
+    ioq_opener:get_pid_for(FdPid).
+
+set_pid_for(FdPid, IOQPid) ->
+    ioq_opener:set_pid_for(FdPid, IOQPid).
 
diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
new file mode 100644
index 0000000..d85991e
--- /dev/null
+++ b/src/ioq_opener.erl
@@ -0,0 +1,256 @@
+-module(ioq_opener).
+-behavior(gen_server).
+
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+-export([
+    start_link/0,
+    %%fetch_pid_for/1,
+    fetch_pid_for/2,
+    fetch_pid_for/3,
+    get_pid_for/1,
+    set_pid_for/2,
+    get_ioq_pids/0,
+    get_pid_idx/0,
+    get_monitor_idx/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(BY_USER, by_user).
+-define(BY_SHARD, by_shard).
+-define(DEFAULT_DISPATCH, ?BY_SHARD).
+-define(PDICT_MARKER, ioq_pid_for).
+
+
+-record(st, {
+    idle = [] :: [{erlang:timestamp(), pid()}],
+    pid_idx :: khash:khash(),
+    monitors :: khash:khash(),
+    dispatch :: by_shard | by_user | undefined
+ }).
+
+
+%% HACK: experiment to allow for spawning IOQ2 pids prior to the spawning
+%% the associated couch_file pids
+%%fetch_pid_for(DbName) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, self()).
+
+
+%% TODO: cleanup the overloaded arity once experiments concluded
+%%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, undefined, self());
+%%fetch_pid_for(DbName, #user_ctx{}=Ctx) when is_binary(DbName) ->
+%%    fetch_pid_for(DbName, Ctx, self());
+fetch_pid_for(DbName, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    fetch_pid_for(DbName, undefined, FdPid).
+
+
+fetch_pid_for(DbName, UserCtx, FdPid) when is_binary(DbName), is_pid(FdPid) ->
+    gen_server:call(?MODULE, {fetch, DbName, UserCtx, FdPid}, infinity).
+
+
+get_pid_for(undefined) ->
+    undefined;
+get_pid_for(DbName) when is_binary(DbName) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:get({?PDICT_MARKER, filename:rootname(DbName)});
+get_pid_for(FdPid) when is_pid(FdPid) ->
+    erlang:get({?PDICT_MARKER, FdPid}).
+
+
+set_pid_for(_, undefined) ->
+    ok;
+set_pid_for(DbName, IOQPid) when is_binary(DbName), is_pid(IOQPid) ->
+    %% HACK: use the same shard format as per #ioq_request{} to post facto
+    %% associate an IOQ pid with a dbname for when we set the IOQ2 pid prior
+    %% to having a db handle
+    erlang:put({?PDICT_MARKER, filename:rootname(DbName)}, IOQPid),
+    ok;
+set_pid_for(FdPid, IOQPid) when is_pid(FdPid), is_pid(IOQPid) ->
+    erlang:put({?PDICT_MARKER, FdPid}, IOQPid),
+    ok.
+
+
+get_pid_idx() ->
+    gen_server:call(?MODULE, get_pid_idx, infinity).
+
+
+get_monitor_idx() ->
+    gen_server:call(?MODULE, get_monitor_idx, infinity).
+
+
+get_ioq_pids() ->
+    lists:foldl(
+      fun
+        ({K, _V}, Acc) when is_pid(K) ->
+              [K | Acc];
+        (_, Acc) ->
+              Acc
+      end, [], get_pid_idx()).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    {ok, PidIdx} = khash:new(),
+    {ok, Monitors} = khash:new(),
+    Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
+        "by_shard" -> ?BY_SHARD;
+        "by_user" -> ?BY_USER;
+        _ -> ?DEFAULT_DISPATCH
+    end,
+    St = #st{
+        pid_idx = PidIdx,
+        monitors = Monitors,
+        dispatch = Dispatch
+    },
+    {ok, St}.
+
+
+handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    Name = case UserCtx of
+        #user_ctx{name=Name0} -> Name0;
+        %% TODO: support unknown user
+        undefined -> throw(unknown_user)
+    end,
+    IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({user, Name}),
+            khash:put(St#st.pid_idx, Name, Pid),
+            khash:put(St#st.pid_idx, Pid, Name),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St) ->
+    Caller = case FdPid of
+        undefined -> From;
+        _ when is_pid(FdPid) -> FdPid
+    end,
+    %% TODO: DbName = drop_compact_ext(DbName0),
+    IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+            khash:put(St#st.pid_idx, DbName, Pid),
+            khash:put(St#st.pid_idx, Pid, DbName),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Caller, IOQPid),
+    {reply, IOQPid, St};
+handle_call(get_pid_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.pid_idx), St};
+handle_call(get_monitor_idx, _From, #st{}=St) ->
+    {reply, khash:to_list(St#st.monitors), St};
+handle_call(_, _From, St) ->
+    {reply, ok, St}.
+
+
+handle_cast(_Msg, St) ->
+    {noreply, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+    case drop_monitor(St#st.monitors, Ref) of
+        {IOQPid, []} ->
+            Name = khash:get(St#st.pid_idx, IOQPid), %% TODO: assert found?
+            khash:del(St#st.pid_idx, IOQPid),
+            khash:del(St#st.pid_idx, Name);
+        {_IOQPid, _Refs} ->
+            ok
+    end,
+    {noreply, St};
+handle_info({'EXIT', Pid, _}, St) ->
+    case khash:get(St#st.pid_idx, Pid, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen, throw error?
+            ok;
+        Name ->
+            khash:del(St#st.pid_idx, Pid),
+            khash:del(St#st.pid_idx, Name)
+    end,
+    {noreply, St};
+handle_info(_Info, St) ->
+    {noreply, St}.
+
+
+terminate(_Reason, _St) ->
+    ok.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+add_monitor(Mons, FdPid, IOQPid) ->
+    PidKey = {FdPid, IOQPid},
+    Ref = case khash:get(Mons, PidKey, not_found) of
+        not_found ->
+            Ref0 = erlang:monitor(process, FdPid),
+            khash:put(Mons, Ref0, PidKey),
+            khash:put(Mons, PidKey, Ref0);
+        Ref0 ->
+            Ref0
+    end,
+    case khash:get(Mons, IOQPid, not_found) of
+        not_found ->
+            khash:put(Mons, IOQPid, [Ref]);
+        Refs ->
+            case lists:member(Ref, Refs) of
+                true ->
+                    ok;
+                false ->
+                    khash:put(Mons, IOQPid, [Ref | Refs])
+            end
+    end,
+    ok.
+
+
+drop_monitor(Mons, Ref) when is_reference(Ref) ->
+    case khash:get(Mons, Ref, not_found) of
+        not_found ->
+            %% TODO: shouldn't happen
+            throw(unexpected);
+        {_FdPid, IOQPid}=PidKey ->
+            case khash:get(Mons, IOQPid, not_found) of
+                not_found ->
+                    %% TODO: shouldn't happen
+                    throw(unexpected);
+                Refs ->
+                    khash:del(Mons, Ref),
+                    khash:del(Mons, PidKey),
+                    case lists:delete(Ref, Refs) of
+                        [] ->
+                            unlink(IOQPid),
+                            khash:del(Mons, IOQPid),
+                            exit(IOQPid, idle),
+                            {IOQPid, []};
+                        Refs1 ->
+                            khash:put(Mons, IOQPid, Refs1),
+                            {IOQPid, Refs1}
+                    end
+            end
+    end.
+
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index dadd44d..b3dc24f 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -23,6 +23,8 @@
     code_change/3
 ]).
 -export([
+    start_link/0,
+    start_link/1,
     start_link/3,
     call/3,
     pcall/1,
@@ -108,21 +110,17 @@ call(Fd, Msg, Dimensions) ->
                 [couchdb, io_queue2, RW, bypassed_count]),
             gen_server:call(Fd, Msg, infinity);
         _ ->
-            DispatchStrategy = config:get(
-                "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
-            Server = case DispatchStrategy of
-                ?DISPATCH_RANDOM ->
-                    SID = rand:uniform(erlang:system_info(schedulers)),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_FD_HASH ->
-                    NumSchedulers = erlang:system_info(schedulers),
-                    SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
-                    ?SERVER_ID(SID);
-                ?DISPATCH_SINGLE_SERVER ->
-                    ?SERVER_ID(1);
-                _ ->
-                    SID = erlang:system_info(scheduler_id),
-                    ?SERVER_ID(SID)
+            Server = case ioq_opener:get_pid_for(Fd) of
+                undefined ->
+                    %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
+                    %%    undefined ->
+                    %%        ioq_server2;
+                    %%    IOQPid ->
+                    %%        IOQPid
+                    %%end;
+                    ioq_server2;
+                IOQPid ->
+                    IOQPid
             end,
             gen_server:call(Server, Req, infinity)
     end.
@@ -266,6 +264,18 @@ update_config() ->
     gen_server:call(?SERVER_ID(1), update_config, infinity).
 
 
+start_link() ->
+    start_link(?MODULE).
+
+
+start_link(?MODULE) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
+start_link({user, _Name}=User) ->
+    gen_server:start_link(?MODULE, [User], []);
+start_link({shard, _Name}=Shard) ->
+    gen_server:start_link(?MODULE, [Shard], []).
+
+
 start_link(Name, SID, Bind) ->
     Options = case Bind of
         true -> [{scheduler, SID}];
@@ -274,7 +284,7 @@ start_link(Name, SID, Bind) ->
     gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
 
 
-init([Name, SID]) ->
+init([{Type, Name}]) ->
     {ok, HQ} = hqueue:new(),
     {ok, Reqs} = khash:new(),
     {ok, Waiters} = khash:new(),
@@ -283,7 +293,7 @@ init([Name, SID]) ->
         reqs = Reqs,
         waiters = Waiters,
         server_name = Name,
-        scheduler_id = SID
+        scheduler_id = Type
     },
     {ok, update_config_int(State)}.
 
@@ -630,7 +640,7 @@ mock_server(Config) ->
     meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
         Default
     end),
-    {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
+    {ok, State} = ioq_server2:init([{global, ?SERVER_ID(1)}]),
     State.
 
 
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 8bfda77..973886a 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -15,7 +15,7 @@
 -vsn(1).
 -behaviour(config_listener).
 -export([start_link/0, init/1]).
--export([get_ioq2_servers/0]).
+-export([get_ioq2_servers/0, get_ioq2_servers_new/0]).
 -export([handle_config_change/5, handle_config_terminate/3]).
 -export([processes/1]).
 
@@ -28,26 +28,20 @@ start_link() ->
 
 init([]) ->
     ok = ioq_config_listener:subscribe(),
-    IOQ2Children = ioq_server2_children(),
     {ok, {
         {one_for_one, 5, 10},
         [
             ?CHILD_WITH_ARGS(config_listener_mon, worker, [?MODULE, nil]),
+            ?CHILD(ioq_opener, worker),
             ?CHILD(ioq_server, worker),
+            ?CHILD(ioq_server2, worker),
             ?CHILD(ioq_osq, worker)
-            | IOQ2Children
         ]
     }}.
 
-ioq_server2_children() ->
-    Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
-    ioq_server2_children(erlang:system_info(schedulers), Bind).
 
-ioq_server2_children(Count, Bind) ->
-    lists:map(fun(I) ->
-        Name = list_to_atom("ioq_server_" ++ integer_to_list(I)),
-        {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]}
-    end, lists:seq(1, Count)).
+get_ioq2_servers_new() ->
+    [ioq_server2 | ioq_opener:get_ioq_pids()].
 
 get_ioq2_servers() ->
     lists:map(fun(I) ->
@@ -92,3 +86,4 @@ filter_children(RegExp) ->
             _ -> false
         end
     end, supervisor:which_children(?MODULE)).
+
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index b6b7bad..c2502c5 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -40,7 +40,12 @@ instantiate({_, S}) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
         end, shards())
     end, io_classes())},
-    ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+    case ioq:ioq2_enabled() of
+        true ->
+            ?_assertEqual(1, ioq:set_disk_concurrency(10));
+        false ->
+            ?_assertEqual(20, ioq:set_disk_concurrency(10))
+    end,
     ?_assertError(badarg, ioq:set_disk_concurrency(0)),
     ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
     ?_assertError(badarg, ioq:set_disk_concurrency(foo))].

[couchdb-ioq] 05/07: Don't crash on no io_priority

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit a84ec63f25714c1ea4d3788cfba021035ed47f47
Author: Russell Branca <ch...@apache.org>
AuthorDate: Fri Aug 13 12:58:22 2021 -0700

    Don't crash on no io_priority
---
 src/ioq_server2.erl | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 85e3644..654b8c8 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -94,7 +94,8 @@
 
 
 -spec call(pid(), term(), io_dimensions()) -> term().
-call(Fd, Msg, Dimensions) when Dimensions =/= undefined ->
+%%call(Fd, Msg, Dimensions) when Dimensions =/= undefined ->
+call(Fd, Msg, Dimensions) ->
     Req0 = #ioq_request{
         fd = Fd,
         msg = Msg,

[couchdb-ioq] 04/07: Add license blurb to ioq_opener

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 05f6ed08c21a3b8c6b5b8c26816182dbff4aead7
Author: Russell Branca <ch...@apache.org>
AuthorDate: Wed Mar 13 21:43:11 2019 +0000

    Add license blurb to ioq_opener
---
 src/ioq_opener.erl | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
index 5bcf065..ebefc73 100644
--- a/src/ioq_opener.erl
+++ b/src/ioq_opener.erl
@@ -1,3 +1,15 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
 -module(ioq_opener).
 -behavior(gen_server).
 

[couchdb-ioq] 03/07: Support finding ioq pids by #ioq_request{}

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 034db83e7a56a5a5c14ec834c5a3160a960f940a
Author: Russell Branca <ch...@apache.org>
AuthorDate: Wed Mar 6 23:03:55 2019 +0000

    Support finding ioq pids by #ioq_request{}
---
 src/ioq_opener.erl  | 53 +++++++++++++++++++++++++++++++++++++++++++++--------
 src/ioq_server2.erl | 28 ++++++++++++----------------
 test/ioq_tests.erl  |  6 ++++--
 3 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/src/ioq_opener.erl b/src/ioq_opener.erl
index d85991e..5bcf065 100644
--- a/src/ioq_opener.erl
+++ b/src/ioq_opener.erl
@@ -12,7 +12,7 @@
 ]).
 -export([
     start_link/0,
-    %%fetch_pid_for/1,
+    fetch_pid_for/1,
     fetch_pid_for/2,
     fetch_pid_for/3,
     get_pid_for/1,
@@ -24,10 +24,14 @@
 
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 -define(BY_USER, by_user).
 -define(BY_SHARD, by_shard).
+-define(BY_CLASS, by_class).
+-define(BY_FD, by_fd).
+-define(BY_DB, by_db).
 -define(DEFAULT_DISPATCH, ?BY_SHARD).
 -define(PDICT_MARKER, ioq_pid_for).
 
@@ -36,7 +40,7 @@
     idle = [] :: [{erlang:timestamp(), pid()}],
     pid_idx :: khash:khash(),
     monitors :: khash:khash(),
-    dispatch :: by_shard | by_user | undefined
+    dispatch :: ?BY_SHARD | ?BY_DB | ?BY_USER | ?BY_CLASS | ?BY_FD | undefined
  }).
 
 
@@ -46,6 +50,10 @@
 %%    fetch_pid_for(DbName, self()).
 
 
+fetch_pid_for(#ioq_request{}=Req) ->
+    gen_server:call(?MODULE, {fetch, Req}, infinity).
+
+
 %% TODO: cleanup the overloaded arity once experiments concluded
 %%fetch_pid_for(DbName, undefined) when is_binary(DbName) ->
 %%    fetch_pid_for(DbName, undefined, self());
@@ -111,8 +119,11 @@ init([]) ->
     {ok, Monitors} = khash:new(),
     Dispatch = case config:get("ioq.opener", "dispatch", undefined) of
         "by_shard" -> ?BY_SHARD;
-        "by_user" -> ?BY_USER;
-        _ -> ?DEFAULT_DISPATCH
+        "by_db"    -> ?BY_DB;
+        "by_user"  -> ?BY_USER;
+        "by_class" -> ?BY_CLASS;
+        "by_fd"    -> ?BY_FD;
+        _          -> ?DEFAULT_DISPATCH
     end,
     St = #st{
         pid_idx = PidIdx,
@@ -122,6 +133,30 @@ init([]) ->
     {ok, St}.
 
 
+handle_call({fetch, #ioq_request{}=Req}, _From, #st{dispatch=Dispatch}=St) ->
+    Key = case Dispatch of
+        ?BY_SHARD ->
+            Req#ioq_request.shard;
+        ?BY_DB ->
+            Req#ioq_request.db;
+        ?BY_USER ->
+            Req#ioq_request.user;
+        ?BY_CLASS ->
+            Req#ioq_request.class;
+        ?BY_FD ->
+            {fd, Req#ioq_request.fd}
+    end,
+    IOQPid = case khash:get(St#st.pid_idx, Key, not_found) of
+        not_found ->
+            {ok, Pid} = ioq_server2:start_link({Dispatch, Key}),
+            khash:put(St#st.pid_idx, Key, Pid),
+            khash:put(St#st.pid_idx, Pid, Key),
+            Pid;
+        Pid ->
+            Pid
+    end,
+    ok = add_monitor(St#st.monitors, Req#ioq_request.fd, IOQPid),
+    {reply, IOQPid, St};
 handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) ->
     Caller = case FdPid of
         undefined -> From;
@@ -134,7 +169,7 @@ handle_call({fetch, _DbName, UserCtx, FdPid}, From, #st{dispatch=?BY_USER}=St) -
     end,
     IOQPid = case khash:get(St#st.pid_idx, Name, not_found) of
         not_found ->
-            {ok, Pid} = ioq_server2:start_link({user, Name}),
+            {ok, Pid} = ioq_server2:start_link({?BY_USER, Name}),
             khash:put(St#st.pid_idx, Name, Pid),
             khash:put(St#st.pid_idx, Pid, Name),
             Pid;
@@ -151,7 +186,7 @@ handle_call({fetch, DbName, _UserCtx, FdPid}, From, #st{dispatch=?BY_SHARD}=St)
     %% TODO: DbName = drop_compact_ext(DbName0),
     IOQPid = case khash:get(St#st.pid_idx, DbName, not_found) of
         not_found ->
-            {ok, Pid} = ioq_server2:start_link({shard, DbName}),
+            {ok, Pid} = ioq_server2:start_link({?BY_SHARD, DbName}),
             khash:put(St#st.pid_idx, DbName, Pid),
             khash:put(St#st.pid_idx, Pid, DbName),
             Pid;
@@ -210,7 +245,8 @@ add_monitor(Mons, FdPid, IOQPid) ->
         not_found ->
             Ref0 = erlang:monitor(process, FdPid),
             khash:put(Mons, Ref0, PidKey),
-            khash:put(Mons, PidKey, Ref0);
+            khash:put(Mons, PidKey, Ref0),
+            khash:put(Mons, FdPid, Ref0);
         Ref0 ->
             Ref0
     end,
@@ -233,12 +269,13 @@ drop_monitor(Mons, Ref) when is_reference(Ref) ->
         not_found ->
             %% TODO: shouldn't happen
             throw(unexpected);
-        {_FdPid, IOQPid}=PidKey ->
+        {FdPid, IOQPid}=PidKey ->
             case khash:get(Mons, IOQPid, not_found) of
                 not_found ->
                     %% TODO: shouldn't happen
                     throw(unexpected);
                 Refs ->
+                    khash:del(Mons, FdPid),
                     khash:del(Mons, Ref),
                     khash:del(Mons, PidKey),
                     case lists:delete(Ref, Refs) of
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 595babe..85e3644 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -112,13 +112,14 @@ call(Fd, Msg, Dimensions) when Dimensions =/= undefined ->
         _ ->
             Server = case ioq_opener:get_pid_for(Fd) of
                 undefined ->
-                    %%case ioq_opener:get_pid_for(Req#ioq_request.shard) of
-                    %%    undefined ->
-                    %%        ioq_server2;
-                    %%    IOQPid ->
-                    %%        IOQPid
-                    %%end;
-                    ioq_server2;
+                    IOQPid = case ioq_opener:fetch_pid_for(Req) of
+                        undefined ->
+                            ioq_server2;
+                        IOQPid0 ->
+                            IOQPid0
+                    end,
+                    ioq_opener:set_pid_for(Fd, IOQPid),
+                    IOQPid;
                 IOQPid ->
                     IOQPid
             end,
@@ -270,9 +271,9 @@ start_link() ->
 
 start_link(?MODULE) ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [{global, ?MODULE}], []);
-start_link({user, _Name}=User) ->
+start_link({by_user, _Name}=User) ->
     gen_server:start_link(?MODULE, [User], []);
-start_link({shard, _Name}=Shard) ->
+start_link({by_shard, _Name}=Shard) ->
     gen_server:start_link(?MODULE, [Shard], []).
 
 
@@ -803,11 +804,6 @@ test_simple_dedupe(St0) ->
         from = FromA,
         key = {Fd, Pos}
     },
-    _Request1B = Request0#ioq_request{
-        init_priority = Priority,
-        from = FromA,
-        key = {Fd, Pos}
-    },
     {noreply, St2, 0} = handle_call(Request0, FromA, St1),
     {noreply, St3, 0} = handle_call(Request0, FromB, St2),
     {reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3),
@@ -877,7 +873,7 @@ test_auto_scale(#state{queue=HQ}=St0) ->
     {_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ),
     Tests0 = [?_assertEqual(PriorityA, PriorityA2)],
     {_St, Tests} = lists:foldl(
-        fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) ->
+        fun(_N, {#state{iterations=I, resize_limit=_RL}=StN0, TestsN}) ->
             ReqN = BaseReq#ioq_request{ref=make_ref()},
             ExpectedPriority = case I == 1 of
                 false -> PriorityA;
@@ -968,7 +964,7 @@ cleanup(Servers) ->
 
 
 instantiate(S) ->
-    Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
+    Old = ?DEFAULT_CONCURRENCY * (1 + length(shards())),
     [{inparallel, lists:map(fun(IOClass) ->
         lists:map(fun(Shard) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index c2502c5..c228241 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -35,14 +35,16 @@ cleanup({Apps, Server}) ->
     exit(Server, kill).
 
 instantiate({_, S}) ->
+    Shards = shards(),
     [{inparallel, lists:map(fun(IOClass) ->
         lists:map(fun(Shard) ->
             check_call(S, make_ref(), priority(IOClass, Shard))
-        end, shards())
+        end, Shards)
     end, io_classes())},
     case ioq:ioq2_enabled() of
         true ->
-            ?_assertEqual(1, ioq:set_disk_concurrency(10));
+            %% TODO: don't assume IOQ2 concurrency is 1
+            ?_assertEqual(1 + length(Shards), ioq:set_disk_concurrency(10));
         false ->
             ?_assertEqual(20, ioq:set_disk_concurrency(10))
     end,

[couchdb-ioq] 07/07: Add ioq helper functions

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 55c8609d201c769090a948b98428ae416ebae56a
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Aug 23 17:59:35 2021 -0700

    Add ioq helper functions
---
 src/ioq.erl | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/src/ioq.erl b/src/ioq.erl
index 2530dba..2448c43 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -22,6 +22,13 @@
     get_pid_for/1,
     set_pid_for/2
 ]).
+-export([
+    fd_pid/1,
+    ioq_pid/1,
+    cache/1
+]).
+
+-include_lib("ioq/include/ioq.hrl").
 
 -define(APPS, [config, folsom, couch_stats, ioq]).
 
@@ -94,3 +101,12 @@ get_pid_for(FdPid) ->
 set_pid_for(FdPid, IOQPid) ->
     ioq_opener:set_pid_for(FdPid, IOQPid).
 
+fd_pid(#ioq_file{fd=Fd}) ->
+    Fd.
+
+ioq_pid(#ioq_file{ioq=IOQ}) ->
+    IOQ.
+
+cache(#ioq_file{tab=Tab}) ->
+    Tab.
+