You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/06/28 19:44:03 UTC
[couchdb-ioq] 03/03: Use ioq_q in ioq_server
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit 396c77cdc1740876a46445eb40a7f08aa0bbc0b3
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Sun Jun 28 14:42:29 2020 -0500
Use ioq_q in ioq_server
I need to add request dedupes and backpressure and load shedding next.
We can still overwhelm ioq_server when traffic spikes but its at least
not as sudden.
---
src/ioq_server.erl | 186 ++++++++++++++---------------------------------------
1 file changed, 49 insertions(+), 137 deletions(-)
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index fce5071..1bce2d3 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -21,9 +21,9 @@
-record(channel, {
name,
- qI = queue:new(), % client readers
- qU = queue:new(), % db updater
- qV = queue:new() % view index updates
+ qI = ioq_q:new(), % client readers
+ qU = ioq_q:new(), % db updater
+ qV = ioq_q:new() % view index updates
}).
-record(state, {
@@ -31,10 +31,11 @@
histos,
reqs = [],
concurrency = 20,
- channels = queue:new(),
- qC = queue:new(), % compaction
- qR = queue:new(), % internal replication
- qL = queue:new(),
+ channels_by_name,
+ channels = ioq_q:new(),
+ qC = ioq_q:new(), % compaction
+ qR = ioq_q:new(), % internal replication
+ qL = ioq_q:new(),
dedupe,
class_priorities,
op_priorities
@@ -51,14 +52,9 @@
tsub
}).
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% This server relies on the internal structure of the channels queue as a %%
-%% {list(), list()} to do in-place modifications of some elements. We are %%
-%% "running on thin ice", as it were. %%
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+ Opts = [{spawn_opt, [{fullsweep_after, 10}]}],
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).
call(Fd, Msg, Priority) ->
{Class, Channel} = analyze_priority(Priority),
@@ -89,7 +85,10 @@ list_custom_channels() ->
ioq_kv:all().
init([]) ->
+ process_flag(message_queue_data, off_heap),
+ {ok, ChannelsByName} = khash:new(),
State = #state{
+ channels_by_name = ChannelsByName,
counters = ets:new(ioq_counters, []),
histos = ets:new(ioq_histos, [named_table, ordered_set])
},
@@ -110,12 +109,12 @@ handle_call(get_counters, _From, #state{counters = Tab} = State) ->
handle_call(get_queue_depths, _From, State) ->
Channels = lists:map(fun(#channel{name=N, qI=I, qU=U, qV=V}) ->
- {N, [queue:len(I), queue:len(U), queue:len(V)]}
- end, queue:to_list(State#state.channels)),
+ {N, [ioq_q:len(I), ioq_q:len(U), ioq_q:len(V)]}
+ end, ioq_q:to_list(State#state.channels)),
Response = [
- {compaction, queue:len(State#state.qC)},
- {replication, queue:len(State#state.qR)},
- {low, queue:len(State#state.qL)},
+ {compaction, ioq_q:len(State#state.qC)},
+ {replication, ioq_q:len(State#state.qR)},
+ {low, ioq_q:len(State#state.qL)},
{channels, {Channels}}
],
{reply, Response, State, 0};
@@ -144,11 +143,9 @@ handle_cast(_Msg, State) ->
handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
case lists:keytake(Ref, #request.ref, Reqs) of
- {value, #request{from=From} = Req, Reqs2} ->
- TResponse = erlang:monotonic_time(),
+ {value, #request{from=From}, Reqs2} ->
erlang:demonitor(Ref, [flush]),
reply_to_all(From, Reply),
- update_histograms(ioq_histos, Req, TResponse),
{noreply, State#state{reqs = Reqs2}, 0};
false ->
{noreply, State, 0}
@@ -256,17 +253,18 @@ enqueue_request(#request{class = low} = Req, State) ->
enqueue_request(Req, State) ->
enqueue_channel(Req, State).
-find_channel(Account, {A, B}) ->
- case lists:keyfind(Account, #channel.name, A) of
- false ->
- case lists:keyfind(Account, #channel.name, B) of
- false ->
- {new, #channel{name = Account}};
- #channel{} = Channel ->
- {2, Channel}
- end;
- #channel{} = Channel ->
- {1, Channel}
+find_channel(Account, #state{} = State) ->
+ #state{
+ channels_by_name = ChannelsByName
+ } = State,
+ case khash:lookup(ChannelsByName, Account) of
+ {value, Channel} ->
+ couch_log:error("XKCD: found channel", []),
+ Channel;
+ not_found ->
+ Channel = #channel{name = Account},
+ khash:put(ChannelsByName, Account, Channel),
+ {new, Channel}
end.
update_channel(Ch, #request{class = view_update} = Req, Dedupe) ->
@@ -277,66 +275,20 @@ update_channel(Ch, Req, Dedupe) ->
% everything else is interactive IO class
Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}.
-update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=Req, Q, DD) ->
- case maybe_dedupe(Fd, Pos, Q, DD) of
- false ->
- queue:in(Req, Q);
- {Elem, N, #request{from=From1}=Match} ->
- catch couch_stats:increment_counter([couchdb, io_queue, merged]),
- Match1 = Match#request{from=append(From, From1)},
- L = element(Elem, Q),
- {H, [Match|T]} = lists:split(N, L),
- setelement(Elem, Q, H ++ [Match1|T])
- end;
update_queue(Req, Q, _Dedupe) ->
- queue:in(Req, Q).
-
-append(A, B) when is_list(B) ->
- [A|B];
-append(A, B) ->
- [A, B].
-
-maybe_dedupe(Fd, Pos, Q, Dedupe) ->
- case Dedupe of
- true -> matching_request(Fd, Pos, Q);
- false -> false
- end.
-
-matching_request(Fd, Pos, {A, B}) ->
- case matching_request(Fd, Pos, A) of
- false ->
- case matching_request(Fd, Pos, B) of
- false ->
- false;
- {N, Request} ->
- {2, N, Request}
- end;
- {N, Request} ->
- {1, N, Request}
- end;
-matching_request(Fd, Pos, List) ->
- matching_request(Fd, Pos, 0, List).
-
-matching_request(_Fd, _Pos, _N, []) ->
- false;
-matching_request(Fd, Pos, N, [#request{fd=Fd, msg={pread_iolist, Pos}}=Req|_]) ->
- {N, Req};
-matching_request(Fd, Pos, N, [_|Rest]) ->
- matching_request(Fd, Pos, N + 1, Rest).
+ ioq_q:in(Req, Q).
enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
DD = State#state.dedupe,
- case find_channel(Account, Q) of
- {new, Channel0} ->
- State#state{channels = queue:in(update_channel(Channel0, Req, DD), Q)};
- {Elem, Channel0} ->
- Channel = update_channel(Channel0, Req, DD),
- % the channel already exists in the queue - update it in place
- L = element(Elem, Q),
- NewL = lists:keyreplace(Account, #channel.name, L, Channel),
- NewQ = setelement(Elem, Q, NewL),
- State#state{channels = NewQ}
- end.
+ % ioq_q's are update-in-place
+ case find_channel(Account, State) of
+ {new, Channel} ->
+ update_channel(Channel, Req, DD),
+ ioq_q:in(Channel, Q);
+ Channel ->
+ update_channel(Channel, Req, DD)
+ end,
+ State.
maybe_submit_request(#state{concurrency=C,reqs=R} = St) when length(R) < C ->
case make_next_request(St) of
@@ -367,6 +319,7 @@ sort_queues([{Q, Pri} | Rest], Norm, Choice, X, Skipped, Acc) ->
make_next_request(State) ->
#state{
+ channels_by_name = ChByName,
channels = Ch,
qC = QC,
qR = QR,
@@ -387,13 +340,15 @@ make_next_request(State) ->
% queue after we've extracted one here
{Item2, [QI2, QU2, QV2]} =
choose_next_request([QI, QU, QV], OpP),
- case queue:is_empty(QU2) andalso
- queue:is_empty(QI2) andalso
- queue:is_empty(QV2) of
+ case ioq_q:is_empty(QU2) andalso
+ ioq_q:is_empty(QI2) andalso
+ ioq_q:is_empty(QV2) of
true ->
+ % An empty channel needs to be removed from channels_by_name
+ khash:del(ChByName, Channel#channel.name),
NewCh2 = NewCh;
false ->
- NewCh2 = queue:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
+ NewCh2 = ioq_q:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
end,
submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
qR=NewQR, qL=NewQL});
@@ -430,55 +385,12 @@ submit_request(Request, State) ->
update_counter(Tab, Channel, IOClass, RW) ->
upsert(Tab, {Channel, IOClass, RW}, 1).
-update_histograms(Tab, Req, TResponse) ->
- #request{t0=T0, tsub=TSubmit, class=Class, channel=Channel, msg=Msg} = Req,
- Delta1 = erlang:convert_time_unit(
- TSubmit - T0, native, microsecond),
- Delta2 = erlang:convert_time_unit(
- TResponse - TSubmit, native, microsecond),
- Bin1 = timebin(Delta1),
- Bin2 = timebin(Delta2),
- Bin3 = timebin(Delta1+Delta2),
- if Channel =/= nil ->
- upsert(Tab, {Channel, submit_delay, Bin1}, 1),
- upsert(Tab, {Channel, svctm, Bin2}, 1),
- upsert(Tab, {Channel, iowait, Bin3}, 1);
- true -> ok end,
- Key = make_key(Class, Msg),
- upsert(Tab, {Key, submit_delay, Bin1}, 1),
- upsert(Tab, {Key, svctm, Bin2}, 1),
- upsert(Tab, {Key, iowait, Bin3}, 1).
-
-make_key(db_compact, _) ->
- <<"compaction">>;
-make_key(view_compact, _) ->
- <<"compaction">>;
-make_key(internal_repl, _) ->
- <<"replication">>;
-make_key(low, _) ->
- <<"low">>;
-make_key(view_update, _) ->
- <<"views">>;
-make_key(db_update, _) ->
- <<"writes">>;
-make_key(interactive, {pread_iolist, _}) ->
- <<"reads">>;
-make_key(interactive, {append_bin, _}) ->
- <<"writes">>;
-make_key(_, _) ->
- <<"other">>.
-
upsert(Tab, Key, Incr) ->
try ets:update_counter(Tab, Key, Incr)
catch error:badarg ->
ets:insert(Tab, {Key, Incr})
end.
-timebin(0) ->
- 0;
-timebin(V) ->
- trunc(10*math:log10(V)).
-
choose_next_request(Qs, Priorities) ->
Norm = lists:sum(Priorities),
QueuesAndPriorities = lists:zip(Qs, Priorities),
@@ -493,7 +405,7 @@ choose_prioritized_request(Qs) ->
choose_prioritized_request([], Empties) ->
{nil, lists:reverse(Empties)};
choose_prioritized_request([Q | Rest], Empties) ->
- case queue:out(Q) of
+ case ioq_q:out(Q) of
{empty, _} ->
choose_prioritized_request(Rest, [Q | Empties]);
{{value, Item}, NewQ} ->