You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/23 20:13:17 UTC

[couchdb] 08/25: Simplify couch_views_worker_server

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5c0b671302d778d870236ce2d6261b8dbf86dad4
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jul 17 10:41:20 2019 -0500

    Simplify couch_views_worker_server
    
    This just follows the standard pattern of keeping a pool of workers
    alive that will accept jobs individually. This avoids all of the oddness
    in passing jobs around after they've been accepted.
    
    I've also renamed it couch_views_server so that the name is a bit less
    wordy.
---
 ...ws_worker_server.erl => couch_views_server.erl} | 69 +++++++++++-----------
 1 file changed, 36 insertions(+), 33 deletions(-)

diff --git a/src/couch_views/src/couch_views_worker_server.erl b/src/couch_views/src/couch_views_server.erl
similarity index 60%
rename from src/couch_views/src/couch_views_worker_server.erl
rename to src/couch_views/src/couch_views_server.erl
index 13bd9aa..8ec2425 100644
--- a/src/couch_views/src/couch_views_worker_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -10,7 +10,7 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_views_worker_server).
+-module(couch_views_server).
 
 
 -behaviour(gen_server).
@@ -39,13 +39,13 @@ start_link() ->
 
 
 init(_) ->
+    process_flag(trap_exit, true),
     couch_views_jobs:set_timeout(),
     State0 = #{
-        workers => #{},
-        acceptor_pid => undefined
+        workers => [],
+        num_workers => num_workers()
     },
-    State = spawn_acceptor(State0),
-    {ok, State}.
+    {ok, spawn_workers(State)}.
 
 
 terminate(_, _St) ->
@@ -56,53 +56,51 @@ handle_call(Msg, _From, St) ->
     {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
 
 
-handle_cast({job, Job, JobData}, State) ->
-    State1 = start_worker(State, Job, JobData),
-    State2 = spawn_acceptor(State1),
-    {noreply, State2};
-
 handle_cast(Msg, St) ->
     {stop, {bad_cast, Msg}, St}.
 
 
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
-    LogMsg = "~p : process ~p exited with ~p",
-    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
-    State1 = check_finished_process(State, Pid),
-    {noreply, State1};
+handle_info({'EXIT', Pid, Reason}, State) ->
+    #{workers := Workers} = State,
+    case Workers -- [Pid] of
+        Workers ->
+            LogMsg = "~p : unknown process ~p exited with ~p",
+            couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+            {stop, {unknown_pid_exit, Pid}, State};
+        NewWorkers ->
+            if Reason == normal -> ok; true ->
+                LogMsg = "~p : indexer process ~p exited with ~p",
+                couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+            end,
+            {noreply, spawn_workers(State#{workers := NewWorkers})}
+    end;
 
 handle_info(Msg, St) ->
-    couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
-    {noreply, St}.
+    {stop, {bad_info, Msg}, St}.
 
 
 code_change(_OldVsn, St, _Extra) ->
     {ok, St}.
 
 
-start_worker(State, Job, JobData) ->
-    #{workers := Workers} = State,
-    {Pid, _Ref} = spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end),
-    Workers1 = Workers#{Pid => true},
-    State#{workers := Workers1}.
-
-
-spawn_acceptor(State) ->
+spawn_workers(State) ->
     #{
         workers := Workers,
-        acceptor_pid := Pid
+        num_workers := NumWorkers
     } = State,
-    MaxWorkers = config:get_integer("couch_views", "max_workers", ?MAX_WORKERS),
-    case maps:size(Workers) >= MaxWorkers of
-        false when not is_pid(Pid) ->
-            Parent = self(),
-            {Pid1, _Ref} = spawn_monitor(fun() -> blocking_acceptor(Parent) end),
-            State#{acceptor_pid := Pid1};
-        _ ->
+    case length(Workers) < NumWorkers of
+        true ->
+            Pid = spawn_worker(),
+            spawn_workers(State#{workers := [Pid | Workers]});
+        false ->
             State
     end.
 
 
+spawn_worker() ->
+    couch_views_indexer:spawn_link().
+
+
 blocking_acceptor(Parent) ->
     case couch_views_jobs:accept() of
         not_found ->
@@ -120,3 +118,8 @@ check_finished_process(State, Pid) ->
     #{workers := Workers} = State,
     Workers1 = maps:remove(Pid, Workers),
     State#{workers := Workers1}.
+
+
+
+num_workers() ->
+    config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).