You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/06/29 20:25:28 UTC
[couchdb-ioq] 01/03: Implement request deduplication
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit 4039fe6d721ea6b444c9550550ca29f8d4ec72dd
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Mon Jun 29 13:54:36 2020 -0500
Implement request deduplication
This adds a hash to each ioq_q which is then used by ioq_server to
deduplicate read requests to a given channel.
---
src/ioq_q.erl | 39 +++++++++++++++++++++++++++++----------
src/ioq_server.erl | 22 +++++++++++++++++++++-
2 files changed, 50 insertions(+), 11 deletions(-)
diff --git a/src/ioq_q.erl b/src/ioq_q.erl
index 8b592b6..1889d81 100644
--- a/src/ioq_q.erl
+++ b/src/ioq_q.erl
@@ -20,26 +20,31 @@
in/2,
out/1,
+ lookup/2,
+ put/3,
+ del/2,
+
to_list/1
]).
new() ->
{ok, Q} = khash:new(),
+ {ok, H} = khash:new(),
ok = khash:put(Q, next_id, 0),
ok = khash:put(Q, oldest_id, undefined),
- Q.
+ {Q, H}.
-len(Q) ->
+len({Q, _}) ->
khash:size(Q) - 2. % two metadata keys
-is_empty(Q) ->
- len(Q) == 0.
+is_empty({Q, H}) ->
+ len({Q, H}) == 0.
-in(Item, Q) ->
+in(Item, {Q, H}) ->
NextId = khash:get(Q, next_id),
ok = khash:put(Q, NextId, Item),
ok = khash:put(Q, next_id, NextId + 1),
@@ -49,14 +54,14 @@ in(Item, Q) ->
_ ->
ok
end,
- Q.
+ {Q, H}.
-out(Q) ->
+out({Q, H}) ->
NextId = khash:get(Q, next_id),
case khash:get(Q, oldest_id) of
undefined ->
- {empty, Q};
+ {empty, {Q, H}};
OldestId ->
{value, Item} = khash:lookup(Q, OldestId),
khash:del(Q, OldestId),
@@ -67,11 +72,25 @@ out(Q) ->
false ->
khash:put(Q, oldest_id, OldestId + 1)
end,
- {{value, Item}, Q}
+ {{value, Item}, {Q, H}}
end.
-to_list(Q) ->
+lookup({_Q, H}, Key) ->
+ khash:lookup(H, Key).
+
+
+put({Q, H}, Key, Value) ->
+ khash:put(H, Key, Value),
+ {Q, H}.
+
+
+del({Q, H}, Key) ->
+ khash:del(H, Key),
+ {Q, H}.
+
+
+to_list({Q, _H}) ->
Entries = khash:to_list(Q),
NoMeta = [{K, V} || {K, V} <- Entries, is_integer(K)],
Sorted = lists:sort(NoMeta),
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index 1bce2d3..c66bb30 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -259,7 +259,6 @@ find_channel(Account, #state{} = State) ->
} = State,
case khash:lookup(ChannelsByName, Account) of
{value, Channel} ->
- couch_log:error("XKCD: found channel", []),
Channel;
not_found ->
Channel = #channel{name = Account},
@@ -275,9 +274,26 @@ update_channel(Ch, Req, Dedupe) ->
% everything else is interactive IO class
Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}.
+update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=R, Q, true) ->
+ Key = {dedupe, Fd, Pos},
+ Req = case ioq_q:lookup(Q, Key) of
+ {value, #request{from = PrevFrom} = PrevReq} ->
+ PrevReq#request{
+ from = append(From, PrevFrom)
+ };
+ not_found ->
+ ioq_q:in(Key, Q),
+ R
+ end,
+ ioq_q:put(Q, Key, Req);
update_queue(Req, Q, _Dedupe) ->
ioq_q:in(Req, Q).
+append(A, B) when is_list(B) ->
+ [A|B];
+append(A, B) ->
+ [A, B].
+
enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
DD = State#state.dedupe,
% ioq_q's are update-in-place
@@ -408,6 +424,10 @@ choose_prioritized_request([Q | Rest], Empties) ->
case ioq_q:out(Q) of
{empty, _} ->
choose_prioritized_request(Rest, [Q | Empties]);
+ {{value, {dedupe, _Fd, _Pos} = Key}, NewQ} ->
+ {value, Req} = ioq_q:lookup(Q, Key),
+ ioq_q:del(Q, Key),
+ {Req, lists:reverse([NewQ | Empties], Rest)};
{{value, Item}, NewQ} ->
{Item, lists:reverse([NewQ | Empties], Rest)}
end.