You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/23 20:13:17 UTC
[couchdb] 08/25: Simplify couch_views_worker_server
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5c0b671302d778d870236ce2d6261b8dbf86dad4
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jul 17 10:41:20 2019 -0500
Simplify couch_views_worker_server
This just follows the standard pattern of keeping a pool of workers
alive that will accept jobs individually. This avoids all of the oddness
in passing jobs around after they've been accepted.
I've also renamed it couch_views_server so that the name is a bit less
wordy.
---
...ws_worker_server.erl => couch_views_server.erl} | 69 +++++++++++-----------
1 file changed, 36 insertions(+), 33 deletions(-)
diff --git a/src/couch_views/src/couch_views_worker_server.erl b/src/couch_views/src/couch_views_server.erl
similarity index 60%
rename from src/couch_views/src/couch_views_worker_server.erl
rename to src/couch_views/src/couch_views_server.erl
index 13bd9aa..8ec2425 100644
--- a/src/couch_views/src/couch_views_worker_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -10,7 +10,7 @@
% License for the specific language governing permissions and limitations under
% the License.
--module(couch_views_worker_server).
+-module(couch_views_server).
-behaviour(gen_server).
@@ -39,13 +39,13 @@ start_link() ->
init(_) ->
+ process_flag(trap_exit, true),
couch_views_jobs:set_timeout(),
State0 = #{
- workers => #{},
- acceptor_pid => undefined
+ workers => [],
+ num_workers => num_workers()
},
- State = spawn_acceptor(State0),
- {ok, State}.
+ {ok, spawn_workers(State)}.
terminate(_, _St) ->
@@ -56,53 +56,51 @@ handle_call(Msg, _From, St) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, St}.
-handle_cast({job, Job, JobData}, State) ->
- State1 = start_worker(State, Job, JobData),
- State2 = spawn_acceptor(State1),
- {noreply, State2};
-
handle_cast(Msg, St) ->
{stop, {bad_cast, Msg}, St}.
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
- LogMsg = "~p : process ~p exited with ~p",
- couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
- State1 = check_finished_process(State, Pid),
- {noreply, State1};
+handle_info({'EXIT', Pid, Reason}, State) ->
+ #{workers := Workers} = State,
+ case Workers -- [Pid] of
+ Workers ->
+ LogMsg = "~p : unknown process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unknown_pid_exit, Pid}, State};
+ NewWorkers ->
+ if Reason == normal -> ok; true ->
+ LogMsg = "~p : indexer process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ end,
+ {noreply, spawn_workers(State#{workers := NewWorkers})}
+ end;
handle_info(Msg, St) ->
- couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
- {noreply, St}.
+ {stop, {bad_info, Msg}, St}.
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
-start_worker(State, Job, JobData) ->
- #{workers := Workers} = State,
- {Pid, _Ref} = spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end),
- Workers1 = Workers#{Pid => true},
- State#{workers := Workers1}.
-
-
-spawn_acceptor(State) ->
+spawn_workers(State) ->
#{
workers := Workers,
- acceptor_pid := Pid
+ num_workers := NumWorkers
} = State,
- MaxWorkers = config:get_integer("couch_views", "max_workers", ?MAX_WORKERS),
- case maps:size(Workers) >= MaxWorkers of
- false when not is_pid(Pid) ->
- Parent = self(),
- {Pid1, _Ref} = spawn_monitor(fun() -> blocking_acceptor(Parent) end),
- State#{acceptor_pid := Pid1};
- _ ->
+ case length(Workers) < NumWorkers of
+ true ->
+ Pid = spawn_worker(),
+ spawn_workers(State#{workers := [Pid | Workers]});
+ false ->
State
end.
+spawn_worker() ->
+ couch_views_indexer:spawn_link().
+
+
blocking_acceptor(Parent) ->
case couch_views_jobs:accept() of
not_found ->
@@ -120,3 +118,8 @@ check_finished_process(State, Pid) ->
#{workers := Workers} = State,
Workers1 = maps:remove(Pid, Workers),
State#{workers := Workers1}.
+
+
+
+num_workers() ->
+ config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).