You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/10/27 21:08:06 UTC

[couchdb] 01/01: Optimize smoosh enqueuing

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch smoosh-server-optimization
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 47f9c723c257d8599e87c20936c9c78c4dba29ca
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Oct 27 17:05:36 2022 -0400

    Optimize smoosh enqueuing
    
    Make sure we don't do an O(n) filtering when enqueuing processes terminate.
    
    Also, cleanup an old code_change clause, and add a tiny optimization to avoid
    ununecessary external calls.
---
 src/smoosh/src/smoosh_server.erl | 71 +++++++++++++++++++++++++++-------------
 1 file changed, 49 insertions(+), 22 deletions(-)

diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 2a55082c1..b7be1c104 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -63,7 +63,8 @@
     view_channels = [],
     tab,
     event_listener,
-    waiting = maps:new()
+    waiting = #{},
+    waiting_by_ref = #{}
 }).
 
 -record(channel, {
@@ -95,16 +96,16 @@ sync_enqueue(Object, Timeout) ->
     gen_server:call(?MODULE, {enqueue, Object}, Timeout).
 
 handle_db_event(DbName, local_updated, St) ->
-    smoosh_server:enqueue(DbName),
+    enqueue(DbName),
     {ok, St};
 handle_db_event(DbName, updated, St) ->
-    smoosh_server:enqueue(DbName),
+    enqueue(DbName),
     {ok, St};
 handle_db_event(DbName, {index_commit, IdxName}, St) ->
-    smoosh_server:enqueue({DbName, IdxName}),
+    enqueue({DbName, IdxName}),
     {ok, St};
 handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
-    smoosh_server:enqueue({DbName, IdxName}),
+    enqueue({DbName, IdxName}),
     {ok, St};
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
@@ -193,14 +194,13 @@ handle_cast({new_view_channels, Channels}, State) ->
      || C <- State#state.view_channels -- Channels
     ],
     {noreply, create_missing_channels(State#state{view_channels = Channels})};
-handle_cast({enqueue, Object}, State) ->
-    #state{waiting = Waiting} = State,
+handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
     case maps:is_key(Object, Waiting) of
         true ->
             {noreply, State};
         false ->
             {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
-            {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}}
+            {noreply, add_enqueue_ref(Object, Ref, State)}
     end.
 
 handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -218,12 +218,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
             ok
     end,
     {noreply, create_missing_channels(State)};
-handle_info({'DOWN', Ref, _, _, _}, State) ->
-    Waiting = maps:filter(
-        fun(_Key, Value) -> Value =/= Ref end,
-        State#state.waiting
-    ),
-    {noreply, State#state{waiting = Waiting}};
+handle_info({'DOWN', Ref, process, _, _}, #state{} = State) ->
+    {noreply, remove_enqueue_ref(Ref, State)};
 handle_info(restart_config_listener, State) ->
     ok = config:listen_for_changes(?MODULE, nil),
     {noreply, State};
@@ -238,19 +234,23 @@ terminate(_Reason, State) ->
     ),
     ok.
 
-code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab, EventListener, Waiting}, _Extra) ->
-    {ok, #state{
-        db_channels = DbChannels,
-        view_channels = ViewChannels,
-        tab = Tab,
-        event_listener = EventListener,
-        waiting = Waiting
-    }};
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 % private functions.
 
+add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) ->
+    #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+    Waiting1 = Waiting#{Object => Ref},
+    WaitingByRef1 = WaitingByRef#{Ref => Object},
+    State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
+remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
+    #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+    {Object, WaitingByRef1} = maps:take(Ref, WaitingByRef),
+    {Ref, Waiting1} = maps:take(Object, Waiting),
+    State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
 get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
     try gen_server:call(P, status) of
         {ok, Status} ->
@@ -663,4 +663,31 @@ config_listener_mon() ->
         [] -> undefined
     end.
 
+add_remove_enqueue_ref_test() ->
+    ObjCount = 100000,
+    ObjRefs = [{I, make_ref()} || I <- lists:seq(1, ObjCount)],
+
+    St = lists:foldl(
+        fun({I, Ref}, #state{} = Acc) ->
+            add_enqueue_ref(I, Ref, Acc)
+        end,
+        #state{},
+        ObjRefs
+    ),
+
+    ?assertEqual(ObjCount, map_size(St#state.waiting)),
+    ?assertEqual(ObjCount, map_size(St#state.waiting_by_ref)),
+
+    {_Objs, Refs} = lists:unzip(ObjRefs),
+    St1 = lists:foldl(
+        fun(Ref, #state{} = Acc) ->
+            remove_enqueue_ref(Ref, Acc)
+        end,
+        St,
+        Refs
+    ),
+
+    % It's basically back to an initial (empty) state
+    ?assertEqual(St1, #state{}).
+
 -endif.