You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/10/27 21:08:05 UTC
[couchdb] branch smoosh-server-optimization created (now 47f9c723c)
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a change to branch smoosh-server-optimization
in repository https://gitbox.apache.org/repos/asf/couchdb.git
at 47f9c723c Optimize smoosh enqueuing
This branch includes the following new commits:
new 47f9c723c Optimize smoosh enqueuing
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[couchdb] 01/01: Optimize smoosh enqueuing
Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch smoosh-server-optimization
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 47f9c723c257d8599e87c20936c9c78c4dba29ca
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Oct 27 17:05:36 2022 -0400
Optimize smoosh enqueuing
Make sure we don't do an O(n) filtering when enqueuing processes terminate.
Also, cleanup an old code_change clause, and add a tiny optimization to avoid
ununecessary external calls.
---
src/smoosh/src/smoosh_server.erl | 71 +++++++++++++++++++++++++++-------------
1 file changed, 49 insertions(+), 22 deletions(-)
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 2a55082c1..b7be1c104 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -63,7 +63,8 @@
view_channels = [],
tab,
event_listener,
- waiting = maps:new()
+ waiting = #{},
+ waiting_by_ref = #{}
}).
-record(channel, {
@@ -95,16 +96,16 @@ sync_enqueue(Object, Timeout) ->
gen_server:call(?MODULE, {enqueue, Object}, Timeout).
handle_db_event(DbName, local_updated, St) ->
- smoosh_server:enqueue(DbName),
+ enqueue(DbName),
{ok, St};
handle_db_event(DbName, updated, St) ->
- smoosh_server:enqueue(DbName),
+ enqueue(DbName),
{ok, St};
handle_db_event(DbName, {index_commit, IdxName}, St) ->
- smoosh_server:enqueue({DbName, IdxName}),
+ enqueue({DbName, IdxName}),
{ok, St};
handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
- smoosh_server:enqueue({DbName, IdxName}),
+ enqueue({DbName, IdxName}),
{ok, St};
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
@@ -193,14 +194,13 @@ handle_cast({new_view_channels, Channels}, State) ->
|| C <- State#state.view_channels -- Channels
],
{noreply, create_missing_channels(State#state{view_channels = Channels})};
-handle_cast({enqueue, Object}, State) ->
- #state{waiting = Waiting} = State,
+handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
case maps:is_key(Object, Waiting) of
true ->
{noreply, State};
false ->
{_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
- {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}}
+ {noreply, add_enqueue_ref(Object, Ref, State)}
end.
handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -218,12 +218,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
ok
end,
{noreply, create_missing_channels(State)};
-handle_info({'DOWN', Ref, _, _, _}, State) ->
- Waiting = maps:filter(
- fun(_Key, Value) -> Value =/= Ref end,
- State#state.waiting
- ),
- {noreply, State#state{waiting = Waiting}};
+handle_info({'DOWN', Ref, process, _, _}, #state{} = State) ->
+ {noreply, remove_enqueue_ref(Ref, State)};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
@@ -238,19 +234,23 @@ terminate(_Reason, State) ->
),
ok.
-code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab, EventListener, Waiting}, _Extra) ->
- {ok, #state{
- db_channels = DbChannels,
- view_channels = ViewChannels,
- tab = Tab,
- event_listener = EventListener,
- waiting = Waiting
- }};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% private functions.
+add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) ->
+ #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+ Waiting1 = Waiting#{Object => Ref},
+ WaitingByRef1 = WaitingByRef#{Ref => Object},
+ State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
+remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
+ #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+ {Object, WaitingByRef1} = maps:take(Ref, WaitingByRef),
+ {Ref, Waiting1} = maps:take(Object, Waiting),
+ State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
try gen_server:call(P, status) of
{ok, Status} ->
@@ -663,4 +663,31 @@ config_listener_mon() ->
[] -> undefined
end.
+add_remove_enqueue_ref_test() ->
+ ObjCount = 100000,
+ ObjRefs = [{I, make_ref()} || I <- lists:seq(1, ObjCount)],
+
+ St = lists:foldl(
+ fun({I, Ref}, #state{} = Acc) ->
+ add_enqueue_ref(I, Ref, Acc)
+ end,
+ #state{},
+ ObjRefs
+ ),
+
+ ?assertEqual(ObjCount, map_size(St#state.waiting)),
+ ?assertEqual(ObjCount, map_size(St#state.waiting_by_ref)),
+
+ {_Objs, Refs} = lists:unzip(ObjRefs),
+ St1 = lists:foldl(
+ fun(Ref, #state{} = Acc) ->
+ remove_enqueue_ref(Ref, Acc)
+ end,
+ St,
+ Refs
+ ),
+
+ % It's basically back to an initial (empty) state
+ ?assertEqual(St1, #state{}).
+
-endif.