You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 21:54:59 UTC

[couchdb-ioq] 06/07: Support IOQ2 pid per couch_file

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 41cd71c360bef825566b311b64539dfdc2ab5e70
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Aug 17 15:11:34 2021 -0700

    Support IOQ2 pid per couch_file
---
 include/ioq.hrl     |  7 ++++++
 src/ioq_server2.erl | 65 ++++++++++++++++++++++++++++++++++++-----------------
 2 files changed, 52 insertions(+), 20 deletions(-)

diff --git a/include/ioq.hrl b/include/ioq.hrl
index 499a140..0577579 100644
--- a/include/ioq.hrl
+++ b/include/ioq.hrl
@@ -61,6 +61,13 @@
 }).
 
 
+-record(ioq_file, {
+    fd,
+    ioq,
+    tab
+}).
+
+
 -type io_priority() :: db_compact
     | db_update
     | interactive
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 654b8c8..7269368 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -66,7 +66,8 @@
     next_key = 1 :: pos_integer(),
     server_name :: atom(),
     scheduler_id = 0 :: non_neg_integer(),
-    max_priority = ?DEFAULT_MAX_PRIORITY :: float()
+    max_priority = ?DEFAULT_MAX_PRIORITY :: float(),
+    fd :: pid() | undefined
 }).
 
 
@@ -102,6 +103,11 @@ call(Fd, Msg, Dimensions) ->
         t0 = os:timestamp()
     },
     Req = add_request_dimensions(Req0, Dimensions),
+    call_int(Fd, Req).
+
+%% TODO: handle Clouseau requests with isolated IOQ2 pid
+%%call_int(#ioq_file{fd={clouseau, _}=IOF, Req) ->
+call_int(#ioq_file{ioq=undefined, fd=Fd}, #ioq_request{msg=Msg}=Req) ->
     Class = atom_to_list(Req#ioq_request.class),
     case config:get_boolean("ioq2.bypass", Class, false) of
         true ->
@@ -111,21 +117,24 @@ call(Fd, Msg, Dimensions) ->
                 [couchdb, io_queue2, RW, bypassed_count]),
             gen_server:call(Fd, Msg, infinity);
         _ ->
-            Server = case ioq_opener:get_pid_for(Fd) of
-                undefined ->
-                    IOQPid = case ioq_opener:fetch_pid_for(Req) of
-                        undefined ->
-                            ioq_server2;
-                        IOQPid0 ->
-                            IOQPid0
-                    end,
-                    ioq_opener:set_pid_for(Fd, IOQPid),
-                    IOQPid;
-                IOQPid ->
-                    IOQPid
-            end,
+            Server = ioq_server2,
+            %%Server = case ioq_opener:get_pid_for(Fd) of
+            %%    undefined ->
+            %%        IOQPid = case ioq_opener:fetch_pid_for(Req) of
+            %%            undefined ->
+            %%                ioq_server2;
+            %%            IOQPid0 ->
+            %%                IOQPid0
+            %%        end,
+            %%        ioq_opener:set_pid_for(Fd, IOQPid),
+            %%        IOQPid;
+            %%    IOQPid ->
+            %%        IOQPid
+            %%end,
             gen_server:call(Server, Req, infinity)
-    end.
+    end;
+call_int(#ioq_file{ioq=IOP}, #ioq_request{}=Req) ->
+    gen_server:call(IOP, Req, infinity).
 
 
 -spec pcall(any()) -> any().
@@ -275,6 +284,8 @@ start_link(?MODULE) ->
 start_link({by_user, _Name}=User) ->
     gen_server:start_link(?MODULE, [User], []);
 start_link({by_shard, _Name}=Shard) ->
+    gen_server:start_link(?MODULE, [Shard], []);
+start_link({by_shard, _Name, _Fd}=Shard) ->
     gen_server:start_link(?MODULE, [Shard], []).
 
 
@@ -286,16 +297,25 @@ start_link(Name, SID, Bind) ->
     gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
 
 
-init([{Type, Name}]) ->
+init([Init]) ->
+    {Type, Name, Fd} = case Init of
+        {_, _, FdPid} = Init0 ->
+            erlang:link(FdPid),
+            Init0;
+        {Type0, Name0} ->
+            {Type0, Name0, undefined}
+    end,
     {ok, HQ} = hqueue:new(),
     {ok, Reqs} = khash:new(),
     {ok, Waiters} = khash:new(),
+    erlang:put(couch_file, Name),
     State = #state{
         queue = HQ,
         reqs = Reqs,
         waiters = Waiters,
         server_name = Name,
-        scheduler_id = Type
+        scheduler_id = Type,
+        fd = Fd
     },
     {ok, update_config_int(State)}.
 
@@ -328,8 +348,8 @@ handle_call(get_pending_reqs, _From, #state{queue=HQ}=State) ->
     {reply, hqueue:to_list(HQ), State, 0};
 handle_call(get_counters, _From, State) ->
     {reply, undefined, State, 0};
-handle_call(_, _From, State) ->
-    {reply, ok, State, 0}.
+handle_call(Msg, _From, State) ->
+    {reply, {error, unknown, Msg}, State, 0}.
 
 
 handle_cast(update_config, State) ->
@@ -444,13 +464,18 @@ submit_request(Req, #state{iterations=I, resize_limit=RL}=State) when I >= RL ->
     submit_request(Req, State#state{iterations=0});
 submit_request(Req, #state{iterations=Iterations}=State) ->
     #ioq_request{
-        fd = Fd,
+        fd = Fd0,
         msg = Call,
         class = Class,
         t0 = T0
     } = Req,
     #state{reqs = Reqs} = State,
 
+    Fd = case Fd0 of
+        #ioq_file{fd=Fd1} -> Fd1;
+        _                 -> Fd0
+    end,
+
     % make the request
     Ref = erlang:monitor(process, Fd),
     Fd ! {'$gen_call', {self(), Ref}, Call},