You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/06/28 19:44:03 UTC

[couchdb-ioq] 03/03: Use ioq_q in ioq_server

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 396c77cdc1740876a46445eb40a7f08aa0bbc0b3
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Sun Jun 28 14:42:29 2020 -0500

    Use ioq_q in ioq_server
    
    I need to add request dedupes and backpressure and load shedding next.
    We can still overwhelm ioq_server when traffic spikes but its at least
    not as sudden.
---
 src/ioq_server.erl | 186 ++++++++++++++---------------------------------------
 1 file changed, 49 insertions(+), 137 deletions(-)

diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index fce5071..1bce2d3 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -21,9 +21,9 @@
 
 -record(channel, {
     name,
-    qI = queue:new(), % client readers
-    qU = queue:new(), % db updater
-    qV = queue:new()  % view index updates
+    qI = ioq_q:new(), % client readers
+    qU = ioq_q:new(), % db updater
+    qV = ioq_q:new()  % view index updates
 }).
 
 -record(state, {
@@ -31,10 +31,11 @@
     histos,
     reqs = [],
     concurrency = 20,
-    channels = queue:new(),
-    qC = queue:new(), % compaction
-    qR = queue:new(), % internal replication
-    qL = queue:new(),
+    channels_by_name,
+    channels = ioq_q:new(),
+    qC = ioq_q:new(), % compaction
+    qR = ioq_q:new(), % internal replication
+    qL = ioq_q:new(),
     dedupe,
     class_priorities,
     op_priorities
@@ -51,14 +52,9 @@
     tsub
 }).
 
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% This server relies on the internal structure of the channels queue as a   %%
-%% {list(), list()} to do in-place modifications of some elements.  We are   %%
-%% "running on thin ice", as it were.                                        %%
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
 start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+    Opts = [{spawn_opt, [{fullsweep_after, 10}]}],
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).
 
 call(Fd, Msg, Priority) ->
     {Class, Channel} = analyze_priority(Priority),
@@ -89,7 +85,10 @@ list_custom_channels() ->
     ioq_kv:all().
 
 init([]) ->
+    process_flag(message_queue_data, off_heap),
+    {ok, ChannelsByName} = khash:new(),
     State = #state{
+        channels_by_name = ChannelsByName,
         counters = ets:new(ioq_counters, []),
         histos = ets:new(ioq_histos, [named_table, ordered_set])
     },
@@ -110,12 +109,12 @@ handle_call(get_counters, _From, #state{counters = Tab} = State) ->
 
 handle_call(get_queue_depths, _From, State) ->
     Channels = lists:map(fun(#channel{name=N, qI=I, qU=U, qV=V}) ->
-        {N, [queue:len(I), queue:len(U), queue:len(V)]}
-    end, queue:to_list(State#state.channels)),
+        {N, [ioq_q:len(I), ioq_q:len(U), ioq_q:len(V)]}
+    end, ioq_q:to_list(State#state.channels)),
     Response = [
-        {compaction, queue:len(State#state.qC)},
-        {replication, queue:len(State#state.qR)},
-        {low, queue:len(State#state.qL)},
+        {compaction, ioq_q:len(State#state.qC)},
+        {replication, ioq_q:len(State#state.qR)},
+        {low, ioq_q:len(State#state.qL)},
         {channels, {Channels}}
     ],
     {reply, Response, State, 0};
@@ -144,11 +143,9 @@ handle_cast(_Msg, State) ->
 
 handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
     case lists:keytake(Ref, #request.ref, Reqs) of
-    {value, #request{from=From} = Req, Reqs2} ->
-        TResponse = erlang:monotonic_time(),
+    {value, #request{from=From}, Reqs2} ->
         erlang:demonitor(Ref, [flush]),
         reply_to_all(From, Reply),
-        update_histograms(ioq_histos, Req, TResponse),
         {noreply, State#state{reqs = Reqs2}, 0};
     false ->
         {noreply, State, 0}
@@ -256,17 +253,18 @@ enqueue_request(#request{class = low} = Req, State) ->
 enqueue_request(Req, State) ->
     enqueue_channel(Req, State).
 
-find_channel(Account, {A, B}) ->
-    case lists:keyfind(Account, #channel.name, A) of
-    false ->
-        case lists:keyfind(Account, #channel.name, B) of
-        false ->
-            {new, #channel{name = Account}};
-        #channel{} = Channel ->
-            {2, Channel}
-        end;
-    #channel{} = Channel ->
-        {1, Channel}
+find_channel(Account, #state{} = State) ->
+    #state{
+        channels_by_name = ChannelsByName
+    } = State,
+    case khash:lookup(ChannelsByName, Account) of
+        {value, Channel} ->
+            couch_log:error("XKCD: found channel", []),
+            Channel;
+        not_found ->
+            Channel = #channel{name = Account},
+            khash:put(ChannelsByName, Account, Channel),
+            {new, Channel}
     end.
 
 update_channel(Ch, #request{class = view_update} = Req, Dedupe) ->
@@ -277,66 +275,20 @@ update_channel(Ch, Req, Dedupe) ->
     % everything else is interactive IO class
     Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}.
 
-update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=Req, Q, DD) ->
-    case maybe_dedupe(Fd, Pos, Q, DD) of
-    false ->
-        queue:in(Req, Q);
-    {Elem, N, #request{from=From1}=Match} ->
-        catch couch_stats:increment_counter([couchdb, io_queue, merged]),
-        Match1 = Match#request{from=append(From, From1)},
-        L = element(Elem, Q),
-        {H, [Match|T]} = lists:split(N, L),
-        setelement(Elem, Q, H ++ [Match1|T])
-    end;
 update_queue(Req, Q, _Dedupe) ->
-    queue:in(Req, Q).
-
-append(A, B) when is_list(B) ->
-    [A|B];
-append(A, B) ->
-    [A, B].
-
-maybe_dedupe(Fd, Pos, Q, Dedupe) ->
-    case Dedupe of
-        true -> matching_request(Fd, Pos, Q);
-        false -> false
-    end.
-
-matching_request(Fd, Pos, {A, B}) ->
-    case matching_request(Fd, Pos, A) of
-    false ->
-        case matching_request(Fd, Pos, B) of
-        false ->
-            false;
-        {N, Request} ->
-            {2, N, Request}
-        end;
-    {N, Request} ->
-        {1, N, Request}
-    end;
-matching_request(Fd, Pos, List) ->
-    matching_request(Fd, Pos, 0, List).
-
-matching_request(_Fd, _Pos, _N, []) ->
-    false;
-matching_request(Fd, Pos, N, [#request{fd=Fd, msg={pread_iolist, Pos}}=Req|_]) ->
-    {N, Req};
-matching_request(Fd, Pos, N, [_|Rest]) ->
-    matching_request(Fd, Pos, N + 1, Rest).
+    ioq_q:in(Req, Q).
 
 enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
     DD = State#state.dedupe,
-    case find_channel(Account, Q) of
-    {new, Channel0} ->
-        State#state{channels = queue:in(update_channel(Channel0, Req, DD), Q)};
-    {Elem, Channel0} ->
-        Channel = update_channel(Channel0, Req, DD),
-        % the channel already exists in the queue - update it in place
-        L = element(Elem, Q),
-        NewL = lists:keyreplace(Account, #channel.name, L, Channel),
-        NewQ = setelement(Elem, Q, NewL),
-        State#state{channels = NewQ}
-    end.
+    % ioq_q's are update-in-place
+    case find_channel(Account, State) of
+        {new, Channel} ->
+            update_channel(Channel, Req, DD),
+            ioq_q:in(Channel, Q);
+        Channel ->
+            update_channel(Channel, Req, DD)
+    end,
+    State.
 
 maybe_submit_request(#state{concurrency=C,reqs=R} = St) when length(R) < C ->
     case make_next_request(St) of
@@ -367,6 +319,7 @@ sort_queues([{Q, Pri} | Rest], Norm, Choice, X, Skipped, Acc) ->
 
 make_next_request(State) ->
     #state{
+        channels_by_name = ChByName,
         channels = Ch,
         qC = QC,
         qR = QR,
@@ -387,13 +340,15 @@ make_next_request(State) ->
         % queue after we've extracted one here
         {Item2, [QI2, QU2, QV2]} =
             choose_next_request([QI, QU, QV], OpP),
-        case queue:is_empty(QU2) andalso
-             queue:is_empty(QI2) andalso
-             queue:is_empty(QV2) of
+        case ioq_q:is_empty(QU2) andalso
+             ioq_q:is_empty(QI2) andalso
+             ioq_q:is_empty(QV2) of
         true ->
+            % An empty channel needs to be removed from channels_by_name
+            khash:del(ChByName, Channel#channel.name),
             NewCh2 = NewCh;
         false ->
-            NewCh2 = queue:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
+            NewCh2 = ioq_q:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
         end,
         submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
             qR=NewQR, qL=NewQL});
@@ -430,55 +385,12 @@ submit_request(Request, State) ->
 update_counter(Tab, Channel, IOClass, RW) ->
     upsert(Tab, {Channel, IOClass, RW}, 1).
 
-update_histograms(Tab, Req, TResponse) ->
-    #request{t0=T0, tsub=TSubmit, class=Class, channel=Channel, msg=Msg} = Req,
-    Delta1 = erlang:convert_time_unit(
-        TSubmit - T0, native, microsecond),
-    Delta2 = erlang:convert_time_unit(
-        TResponse - TSubmit, native, microsecond),
-    Bin1 = timebin(Delta1),
-    Bin2 = timebin(Delta2),
-    Bin3 = timebin(Delta1+Delta2),
-    if Channel =/= nil ->
-        upsert(Tab, {Channel, submit_delay, Bin1}, 1),
-        upsert(Tab, {Channel, svctm, Bin2}, 1),
-        upsert(Tab, {Channel, iowait, Bin3}, 1);
-    true -> ok end,
-    Key = make_key(Class, Msg),
-    upsert(Tab, {Key, submit_delay, Bin1}, 1),
-    upsert(Tab, {Key, svctm, Bin2}, 1),
-    upsert(Tab, {Key, iowait, Bin3}, 1).
-
-make_key(db_compact, _) ->
-    <<"compaction">>;
-make_key(view_compact, _) ->
-    <<"compaction">>;
-make_key(internal_repl, _) ->
-    <<"replication">>;
-make_key(low, _) ->
-    <<"low">>;
-make_key(view_update, _) ->
-    <<"views">>;
-make_key(db_update, _) ->
-    <<"writes">>;
-make_key(interactive, {pread_iolist, _}) ->
-    <<"reads">>;
-make_key(interactive, {append_bin, _}) ->
-    <<"writes">>;
-make_key(_, _) ->
-    <<"other">>.
-
 upsert(Tab, Key, Incr) ->
     try ets:update_counter(Tab, Key, Incr)
     catch error:badarg ->
         ets:insert(Tab, {Key, Incr})
     end.
 
-timebin(0) ->
-    0;
-timebin(V) ->
-    trunc(10*math:log10(V)).
-
 choose_next_request(Qs, Priorities) ->
     Norm = lists:sum(Priorities),
     QueuesAndPriorities = lists:zip(Qs, Priorities),
@@ -493,7 +405,7 @@ choose_prioritized_request(Qs) ->
 choose_prioritized_request([], Empties) ->
     {nil, lists:reverse(Empties)};
 choose_prioritized_request([Q | Rest], Empties) ->
-    case queue:out(Q) of
+    case ioq_q:out(Q) of
     {empty, _} ->
         choose_prioritized_request(Rest, [Q | Empties]);
     {{value, Item}, NewQ} ->