You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 21:54:59 UTC
[couchdb-ioq] 06/07: Support IOQ2 pid per couch_file
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit 41cd71c360bef825566b311b64539dfdc2ab5e70
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Aug 17 15:11:34 2021 -0700
Support IOQ2 pid per couch_file
---
include/ioq.hrl | 7 ++++++
src/ioq_server2.erl | 65 ++++++++++++++++++++++++++++++++++++-----------------
2 files changed, 52 insertions(+), 20 deletions(-)
diff --git a/include/ioq.hrl b/include/ioq.hrl
index 499a140..0577579 100644
--- a/include/ioq.hrl
+++ b/include/ioq.hrl
@@ -61,6 +61,13 @@
}).
+-record(ioq_file, {
+ fd,
+ ioq,
+ tab
+}).
+
+
-type io_priority() :: db_compact
| db_update
| interactive
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 654b8c8..7269368 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -66,7 +66,8 @@
next_key = 1 :: pos_integer(),
server_name :: atom(),
scheduler_id = 0 :: non_neg_integer(),
- max_priority = ?DEFAULT_MAX_PRIORITY :: float()
+ max_priority = ?DEFAULT_MAX_PRIORITY :: float(),
+ fd :: pid() | undefined
}).
@@ -102,6 +103,11 @@ call(Fd, Msg, Dimensions) ->
t0 = os:timestamp()
},
Req = add_request_dimensions(Req0, Dimensions),
+ call_int(Fd, Req).
+
+%% TODO: handle Clouseau requests with isolated IOQ2 pid
+%%call_int(#ioq_file{fd={clouseau, _}=IOF, Req) ->
+call_int(#ioq_file{ioq=undefined, fd=Fd}, #ioq_request{msg=Msg}=Req) ->
Class = atom_to_list(Req#ioq_request.class),
case config:get_boolean("ioq2.bypass", Class, false) of
true ->
@@ -111,21 +117,24 @@ call(Fd, Msg, Dimensions) ->
[couchdb, io_queue2, RW, bypassed_count]),
gen_server:call(Fd, Msg, infinity);
_ ->
- Server = case ioq_opener:get_pid_for(Fd) of
- undefined ->
- IOQPid = case ioq_opener:fetch_pid_for(Req) of
- undefined ->
- ioq_server2;
- IOQPid0 ->
- IOQPid0
- end,
- ioq_opener:set_pid_for(Fd, IOQPid),
- IOQPid;
- IOQPid ->
- IOQPid
- end,
+ Server = ioq_server2,
+ %%Server = case ioq_opener:get_pid_for(Fd) of
+ %% undefined ->
+ %% IOQPid = case ioq_opener:fetch_pid_for(Req) of
+ %% undefined ->
+ %% ioq_server2;
+ %% IOQPid0 ->
+ %% IOQPid0
+ %% end,
+ %% ioq_opener:set_pid_for(Fd, IOQPid),
+ %% IOQPid;
+ %% IOQPid ->
+ %% IOQPid
+ %%end,
gen_server:call(Server, Req, infinity)
- end.
+ end;
+call_int(#ioq_file{ioq=IOP}, #ioq_request{}=Req) ->
+ gen_server:call(IOP, Req, infinity).
-spec pcall(any()) -> any().
@@ -275,6 +284,8 @@ start_link(?MODULE) ->
start_link({by_user, _Name}=User) ->
gen_server:start_link(?MODULE, [User], []);
start_link({by_shard, _Name}=Shard) ->
+ gen_server:start_link(?MODULE, [Shard], []);
+start_link({by_shard, _Name, _Fd}=Shard) ->
gen_server:start_link(?MODULE, [Shard], []).
@@ -286,16 +297,25 @@ start_link(Name, SID, Bind) ->
gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
-init([{Type, Name}]) ->
+init([Init]) ->
+ {Type, Name, Fd} = case Init of
+ {_, _, FdPid} = Init0 ->
+ erlang:link(FdPid),
+ Init0;
+ {Type0, Name0} ->
+ {Type0, Name0, undefined}
+ end,
{ok, HQ} = hqueue:new(),
{ok, Reqs} = khash:new(),
{ok, Waiters} = khash:new(),
+ erlang:put(couch_file, Name),
State = #state{
queue = HQ,
reqs = Reqs,
waiters = Waiters,
server_name = Name,
- scheduler_id = Type
+ scheduler_id = Type,
+ fd = Fd
},
{ok, update_config_int(State)}.
@@ -328,8 +348,8 @@ handle_call(get_pending_reqs, _From, #state{queue=HQ}=State) ->
{reply, hqueue:to_list(HQ), State, 0};
handle_call(get_counters, _From, State) ->
{reply, undefined, State, 0};
-handle_call(_, _From, State) ->
- {reply, ok, State, 0}.
+handle_call(Msg, _From, State) ->
+ {reply, {error, unknown, Msg}, State, 0}.
handle_cast(update_config, State) ->
@@ -444,13 +464,18 @@ submit_request(Req, #state{iterations=I, resize_limit=RL}=State) when I >= RL ->
submit_request(Req, State#state{iterations=0});
submit_request(Req, #state{iterations=Iterations}=State) ->
#ioq_request{
- fd = Fd,
+ fd = Fd0,
msg = Call,
class = Class,
t0 = T0
} = Req,
#state{reqs = Reqs} = State,
+ Fd = case Fd0 of
+ #ioq_file{fd=Fd1} -> Fd1;
+ _ -> Fd0
+ end,
+
% make the request
Ref = erlang:monitor(process, Fd),
Fd ! {'$gen_call', {self(), Ref}, Call},