You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/06/29 20:25:30 UTC

[couchdb-ioq] 03/03: Shed load in ioq_server

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit a877a724cc47dd269796a30bdf623814126b40af
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Mon Jun 29 15:24:23 2020 -0500

    Shed load in ioq_server
    
    This avoids processing any request that we find either in our message
    box or in our internal queuing system that has already timedout and had
    the client exit.
---
 src/ioq_benchmark.erl |  9 +++++--
 src/ioq_server.erl    | 73 ++++++++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 70 insertions(+), 12 deletions(-)

diff --git a/src/ioq_benchmark.erl b/src/ioq_benchmark.erl
index 4e2ea2d..8ef63e9 100644
--- a/src/ioq_benchmark.erl
+++ b/src/ioq_benchmark.erl
@@ -77,7 +77,8 @@ run(Opts) ->
     ets:new(?WORKERS, [public, set, named_table]),
     create_files(Opts),
     %run_static(Opts),
-    run_dynamic(Opts).
+    %run_dynamic(Opts).
+    run_static(Opts).
 
 
 run_static(Opts) ->
@@ -328,7 +329,11 @@ gen_priority_and_message(UserBin, ClassPerc, OpPerc) ->
         view ->
             case rand:uniform() < 0.6 of
                 true ->
-                    {pread_iolist, rand:uniform()};
+                    Pos = case rand:uniform() of
+                        V when V < 0.05 -> 0.0;
+                        V -> V
+                    end,
+                    {pread_iolist, Pos};
                 false ->
                     {append_binary, rand:uniform()}
             end
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index 1c8a2c6..4bc6eae 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -319,15 +319,21 @@ append(A, B) when is_list(B) ->
 append(A, B) ->
     [A, B].
 
-enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
+enqueue_channel(#request{name = Account} = Req, #state{channels = Q} = State) ->
     DD = State#state.dedupe,
-    % ioq_q's are update-in-place
-    case find_channel(Account, State) of
-        {new, Channel} ->
-            update_channel(Channel, Req, DD),
-            ioq_q:in(Channel, Q);
-        Channel ->
-            update_channel(Channel, Req, DD)
+    ShouldProcess = should_process_request(Req),
+    case should_process_request(Req) of
+        true ->
+            % ioq_q's are update-in-place
+            case find_channel(Account, State) of
+                {new, Channel} ->
+                    update_channel(Channel, Req, DD),
+                    ioq_q:in(Channel, Q);
+                Channel ->
+                    update_channel(Channel, Req, DD)
+            end;
+        false ->
+            ok
     end,
     State.
 
@@ -391,8 +397,26 @@ make_next_request(State) ->
         false ->
             NewCh2 = ioq_q:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
         end,
-        submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
-            qR=NewQR, qL=NewQL});
+        NewState = State#state{
+            channels = NewCh2,
+            qC = NewQC,
+            qR = NewQR,
+            qL = NewQL
+        },
+        ShouldProcess = should_process_request(Item2),
+        case ShouldProcess of
+            true ->
+                submit_request(Item2, NewState);
+            false ->
+                % We're recursing here because of the logic
+                % in maybe_submit_request where returning
+                % NewState may actually end up being equal
+                % to the old state due to ioq_q instances
+                % now being references. This would be a bug
+                % that prevented from using all of the
+                % available concurrent request slots.
+                make_next_request(NewState)
+        end;
     _ ->
         % Item is a background (compaction or internal replication) task
         submit_request(Item, State#state{channels=NewCh, qC=NewQC, qR=NewQR,
@@ -423,6 +447,35 @@ submit_request(Request, State) ->
     update_counter(Counters, Channel, IOClass, RW),
     State#state{reqs = [Request#request{tsub=SubmitTime, ref=Ref} | Reqs]}.
 
+
+should_process_request(#request{class = interactive} = Request) ->
+    #request{
+        from = Froms,
+        t0 = T0
+    } = Req,
+
+    % Calculate our timeout
+    RawDiff = erlang:monotonic_time() - T0,
+    MilliDiff = erlang:convert_time_unit(Diff, native, millisecond),
+    TimedOut = MilliDiff > ?INITIAL_TIMEOUT,
+
+    if not TimedOut -> true; true ->
+        % Gather the list of pids
+        ClientPids = case Froms of
+            {ClientPid, _Tag} ->
+                [ClientPid];
+            Froms when is_list(Froms) ->
+                [ClientPid || {ClientPid | _From} <- Froms]
+        end,
+
+        AnyAlive = lists:any(fun erlang:is_process_alive/1, ClientPids),
+        if AnyAlive -> true; true ->
+            false
+        end
+    end;
+should_process_request(_Request) ->
+    true.
+
 update_counter(Tab, Channel, IOClass, RW) ->
     upsert(Tab, {Channel, IOClass, RW}, 1).