You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/10/27 21:08:05 UTC

[couchdb] branch smoosh-server-optimization created (now 47f9c723c)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch smoosh-server-optimization
in repository https://gitbox.apache.org/repos/asf/couchdb.git


      at 47f9c723c Optimize smoosh enqueuing

This branch includes the following new commits:

     new 47f9c723c Optimize smoosh enqueuing

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Optimize smoosh enqueuing

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch smoosh-server-optimization
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 47f9c723c257d8599e87c20936c9c78c4dba29ca
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Thu Oct 27 17:05:36 2022 -0400

    Optimize smoosh enqueuing
    
    Make sure we don't do an O(n) filtering when enqueuing processes terminate.
    
    Also, cleanup an old code_change clause, and add a tiny optimization to avoid
    ununecessary external calls.
---
 src/smoosh/src/smoosh_server.erl | 71 +++++++++++++++++++++++++++-------------
 1 file changed, 49 insertions(+), 22 deletions(-)

diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 2a55082c1..b7be1c104 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -63,7 +63,8 @@
     view_channels = [],
     tab,
     event_listener,
-    waiting = maps:new()
+    waiting = #{},
+    waiting_by_ref = #{}
 }).
 
 -record(channel, {
@@ -95,16 +96,16 @@ sync_enqueue(Object, Timeout) ->
     gen_server:call(?MODULE, {enqueue, Object}, Timeout).
 
 handle_db_event(DbName, local_updated, St) ->
-    smoosh_server:enqueue(DbName),
+    enqueue(DbName),
     {ok, St};
 handle_db_event(DbName, updated, St) ->
-    smoosh_server:enqueue(DbName),
+    enqueue(DbName),
     {ok, St};
 handle_db_event(DbName, {index_commit, IdxName}, St) ->
-    smoosh_server:enqueue({DbName, IdxName}),
+    enqueue({DbName, IdxName}),
     {ok, St};
 handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
-    smoosh_server:enqueue({DbName, IdxName}),
+    enqueue({DbName, IdxName}),
     {ok, St};
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
@@ -193,14 +194,13 @@ handle_cast({new_view_channels, Channels}, State) ->
      || C <- State#state.view_channels -- Channels
     ],
     {noreply, create_missing_channels(State#state{view_channels = Channels})};
-handle_cast({enqueue, Object}, State) ->
-    #state{waiting = Waiting} = State,
+handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
     case maps:is_key(Object, Waiting) of
         true ->
             {noreply, State};
         false ->
             {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
-            {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}}
+            {noreply, add_enqueue_ref(Object, Ref, State)}
     end.
 
 handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -218,12 +218,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
             ok
     end,
     {noreply, create_missing_channels(State)};
-handle_info({'DOWN', Ref, _, _, _}, State) ->
-    Waiting = maps:filter(
-        fun(_Key, Value) -> Value =/= Ref end,
-        State#state.waiting
-    ),
-    {noreply, State#state{waiting = Waiting}};
+handle_info({'DOWN', Ref, process, _, _}, #state{} = State) ->
+    {noreply, remove_enqueue_ref(Ref, State)};
 handle_info(restart_config_listener, State) ->
     ok = config:listen_for_changes(?MODULE, nil),
     {noreply, State};
@@ -238,19 +234,23 @@ terminate(_Reason, State) ->
     ),
     ok.
 
-code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab, EventListener, Waiting}, _Extra) ->
-    {ok, #state{
-        db_channels = DbChannels,
-        view_channels = ViewChannels,
-        tab = Tab,
-        event_listener = EventListener,
-        waiting = Waiting
-    }};
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 % private functions.
 
+add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) ->
+    #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+    Waiting1 = Waiting#{Object => Ref},
+    WaitingByRef1 = WaitingByRef#{Ref => Object},
+    State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
+remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
+    #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+    {Object, WaitingByRef1} = maps:take(Ref, WaitingByRef),
+    {Ref, Waiting1} = maps:take(Object, Waiting),
+    State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
 get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
     try gen_server:call(P, status) of
         {ok, Status} ->
@@ -663,4 +663,31 @@ config_listener_mon() ->
         [] -> undefined
     end.
 
+add_remove_enqueue_ref_test() ->
+    ObjCount = 100000,
+    ObjRefs = [{I, make_ref()} || I <- lists:seq(1, ObjCount)],
+
+    St = lists:foldl(
+        fun({I, Ref}, #state{} = Acc) ->
+            add_enqueue_ref(I, Ref, Acc)
+        end,
+        #state{},
+        ObjRefs
+    ),
+
+    ?assertEqual(ObjCount, map_size(St#state.waiting)),
+    ?assertEqual(ObjCount, map_size(St#state.waiting_by_ref)),
+
+    {_Objs, Refs} = lists:unzip(ObjRefs),
+    St1 = lists:foldl(
+        fun(Ref, #state{} = Acc) ->
+            remove_enqueue_ref(Ref, Acc)
+        end,
+        St,
+        Refs
+    ),
+
+    % It's basically back to an initial (empty) state
+    ?assertEqual(St1, #state{}).
+
 -endif.