You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/06/29 20:25:28 UTC

[couchdb-ioq] 01/03: Implement request deduplication

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 4039fe6d721ea6b444c9550550ca29f8d4ec72dd
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Mon Jun 29 13:54:36 2020 -0500

    Implement request deduplication
    
    This adds a hash to each ioq_q which is then used by ioq_server to
    deduplicate read requests to a given channel.
---
 src/ioq_q.erl      | 39 +++++++++++++++++++++++++++++----------
 src/ioq_server.erl | 22 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 11 deletions(-)

diff --git a/src/ioq_q.erl b/src/ioq_q.erl
index 8b592b6..1889d81 100644
--- a/src/ioq_q.erl
+++ b/src/ioq_q.erl
@@ -20,26 +20,31 @@
     in/2,
     out/1,
 
+    lookup/2,
+    put/3,
+    del/2,
+
     to_list/1
 ]).
 
 
 new() ->
     {ok, Q} = khash:new(),
+    {ok, H} = khash:new(),
     ok = khash:put(Q, next_id, 0),
     ok = khash:put(Q, oldest_id, undefined),
-    Q.
+    {Q, H}.
 
 
-len(Q) ->
+len({Q, _}) ->
     khash:size(Q) - 2. % two metadata keys
 
 
-is_empty(Q) ->
-    len(Q) == 0.
+is_empty({Q, H}) ->
+    len({Q, H}) == 0.
 
 
-in(Item, Q) ->
+in(Item, {Q, H}) ->
     NextId = khash:get(Q, next_id),
     ok = khash:put(Q, NextId, Item),
     ok = khash:put(Q, next_id, NextId + 1),
@@ -49,14 +54,14 @@ in(Item, Q) ->
         _ ->
             ok
     end,
-    Q.
+    {Q, H}.
 
 
-out(Q) ->
+out({Q, H}) ->
     NextId = khash:get(Q, next_id),
     case khash:get(Q, oldest_id) of
         undefined ->
-            {empty, Q};
+            {empty, {Q, H}};
         OldestId ->
             {value, Item} = khash:lookup(Q, OldestId),
             khash:del(Q, OldestId),
@@ -67,11 +72,25 @@ out(Q) ->
                 false ->
                     khash:put(Q, oldest_id, OldestId + 1)
             end,
-            {{value, Item}, Q}
+            {{value, Item}, {Q, H}}
     end.
 
 
-to_list(Q) ->
+lookup({_Q, H}, Key) ->
+    khash:lookup(H, Key).
+
+
+put({Q, H}, Key, Value) ->
+    khash:put(H, Key, Value),
+    {Q, H}.
+
+
+del({Q, H}, Key) ->
+    khash:del(H, Key),
+    {Q, H}.
+
+
+to_list({Q, _H}) ->
     Entries = khash:to_list(Q),
     NoMeta = [{K, V} || {K, V} <- Entries, is_integer(K)],
     Sorted = lists:sort(NoMeta),
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index 1bce2d3..c66bb30 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -259,7 +259,6 @@ find_channel(Account, #state{} = State) ->
     } = State,
     case khash:lookup(ChannelsByName, Account) of
         {value, Channel} ->
-            couch_log:error("XKCD: found channel", []),
             Channel;
         not_found ->
             Channel = #channel{name = Account},
@@ -275,9 +274,26 @@ update_channel(Ch, Req, Dedupe) ->
     % everything else is interactive IO class
     Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}.
 
+update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=R, Q, true) ->
+    Key = {dedupe, Fd, Pos},
+    Req = case ioq_q:lookup(Q, Key) of
+        {value, #request{from = PrevFrom} = PrevReq} ->
+            PrevReq#request{
+                from = append(From, PrevFrom)
+            };
+        not_found ->
+            ioq_q:in(Key, Q),
+            R
+    end,
+    ioq_q:put(Q, Key, Req);
 update_queue(Req, Q, _Dedupe) ->
     ioq_q:in(Req, Q).
 
+append(A, B) when is_list(B) ->
+    [A|B];
+append(A, B) ->
+    [A, B].
+
 enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
     DD = State#state.dedupe,
     % ioq_q's are update-in-place
@@ -408,6 +424,10 @@ choose_prioritized_request([Q | Rest], Empties) ->
     case ioq_q:out(Q) of
     {empty, _} ->
         choose_prioritized_request(Rest, [Q | Empties]);
+    {{value, {dedupe, _Fd, _Pos} = Key}, NewQ} ->
+        {value, Req} = ioq_q:lookup(Q, Key),
+        ioq_q:del(Q, Key),
+        {Req, lists:reverse([NewQ | Empties], Rest)};
     {{value, Item}, NewQ} ->
         {Item, lists:reverse([NewQ | Empties], Rest)}
     end.